Spring Integration mqtt集成到springboot
时间: 2024-03-29 16:16:52 浏览: 111
mqtt-demo:基于spring-boot2.0.3,eclipse.paho.mqttv3.1.2的接收与推送集成
5星 · 资源好评率100%
将 Spring Integration 集成到 Spring Boot 中,可以用来实现 MQTT 的消息传递,下面是一个简单的例子:
1. 添加 Maven 依赖
```
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
```
2. 配置 MQTT 连接
```
@Configuration
public class MqttConfig {
private final MqttProperties mqttProperties;
public MqttConfig(MqttProperties mqttProperties) {
this.mqttProperties = mqttProperties;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttProperties.getUrls());
factory.setUserName(mqttProperties.getUsername());
factory.setPassword(mqttProperties.getPassword());
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId(),
mqttClientFactory(),
mqttProperties.getTopics());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
return messageHandler;
}
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(inbound())
.transform(p -> new String((byte[]) p.getPayload()))
.handle("mqttHandler", "handleMessage")
.get();
}
}
```
3. 定义消息处理器
```
@Component("mqttHandler")
public class MqttHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttHandler.class);
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(String message) {
LOGGER.info("Received message: {}", message);
}
@Autowired
private MessageChannel mqttOutboundChannel;
public void sendMessage(String payload) {
mqttOutboundChannel.send(MessageBuilder.withPayload(payload).build());
}
}
```
4. 在需要发送消息的地方调用 `MqttHandler.sendMessage()` 方法即可。
这样就实现了 MQTT 的消息传递,并且集成到了 Spring Boot 中,使用起来非常方便。
阅读全文