java mqtt实现
时间: 2024-01-23 11:15:07 浏览: 90
以下是两种Java实现MQTT的方式:
1. 使用Paho Java库实现MQTT:
```java
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttClientExample {
public static void main(String[] args) {
String broker = "tcp://mqtt.eclipse.org:1883";
String clientId = "JavaExample";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
mqttClient.connect(connOpts);
System.out.println("Connected to MQTT broker");
String topic = "test/topic";
int qos = 1;
mqttClient.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);
mqttClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("Connection lost");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Received message: " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered");
}
});
// Publish a message
String payload = "Hello, MQTT!";
mqttClient.publish(topic, new MqttMessage(payload.getBytes()));
System.out.println("Published message: " + payload);
// Disconnect from the broker
mqttClient.disconnect();
System.out.println("Disconnected from MQTT broker");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
```
2. 使用Spring Boot实现MQTT:
首先,需要在pom.xml文件中添加以下依赖:
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
```
然后,创建一个MqttConfig类来配置MQTT连接:
```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@EnableIntegration
public class MqttConfig {
private static final String BROKER = "tcp://mqtt.eclipse.org:1883";
private static final String CLIENT_ID = "SpringBootExample";
private static final String TOPIC = "test/topic";
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(BROKER);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID, mqttClientFactory(), TOPIC);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
System.out.println("Received message: " + message.getPayload());
};
}
@Bean
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
void sendToMqtt(String payload);
}
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(CLIENT_ID, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(TOPIC);
return messageHandler;
}
@Bean
public MessagingTemplate mqttMessageTemplate() {
return new MessagingTemplate(mqttOutputChannel());
}
}
```
最后,创建一个MqttClientService类来发送消息:
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Service;
@Service
public class MqttClientService {
@Autowired
private MqttGateway mqttGateway;
public void sendMessage(String payload) {
mqttGateway.sendToMqtt(payload);
}
}
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
void sendToMqtt(String payload);
}
```
以上是两种Java实现MQTT的方式,你可以根据自己的需求选择适合的方式进行实现。
阅读全文