springboot 整合 mqtt
时间: 2023-11-28 13:44:43 浏览: 109
以下是Spring Boot整合MQTT的步骤:
1.在pom.xml文件中添加spring-integration-mqtt依赖:
```xml
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
```
2.在application.properties文件中添加MQTT相关配置:
```properties
## MQTT
mqtt.host=tcp://192.168.10.198:1883
mqtt.clientId=ClientId_local
mqtt.username=admin
mqtt.password=123456
mqtt.timeout=10
mqtt.keepalive=20
mqtt.topic1=A/pick/warn/#
```
3.创建一个MQTT配置类,用于配置MQTT连接工厂和消息通道:
```java
@Configuration
@EnableIntegration
public class MqttConfig {
@Value("${mqtt.host}")
private String host;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.timeout}")
private int timeout;
@Value("${mqtt.keepalive}")
private int keepalive;
@Value("${mqtt.topic1}")
private String topic1;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setConnectionTimeout(timeout); mqttConnectOptions.setKeepAliveInterval(keepalive);
return mqttConnectOptions;
}
@Bean
public DefaultMqttPahoClientFactory getDefaultMqttPahoClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(host, clientId, getDefaultMqttPahoClientFactory(),
topic1);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInputChannel())
.handle(handler())
.get();
}
}
```
4.在需要使用MQTT的地方注入MessageChannel,然后发送消息即可:
```java
@Autowired
private MessageChannel mqttInputChannel;
public void sendMessage(String message) {
mqttInputChannel.send(MessageBuilder.withPayload(message).build());
}
```
阅读全文