使用Eclipse Paho MQTT 来订阅消息
时间: 2023-07-22 08:07:08 浏览: 186
MQTT发布/订阅消息机制
首先,你需要准备一个 MQTT 服务器来发布消息。接着,你需要在你的 Java 项目中添加 Eclipse Paho MQTT 客户端库的依赖。
在你的 Java 代码中,你需要创建一个 MQTT 客户端并连接到 MQTT 服务器。你可以使用以下代码来创建一个 MQTT 客户端:
```java
String brokerUrl = "tcp://localhost:1883"; // MQTT 服务器地址
String clientId = "clientId"; // 客户端 ID,可以是任何字符串
MqttClient client = new MqttClient(brokerUrl, clientId);
```
接着,你需要定义一个回调函数,用来处理收到的消息。你可以使用以下代码来定义一个回调函数:
```java
public class MyMqttCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
// 处理连接丢失的情况
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 处理收到的消息
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 处理消息发送完成的情况
}
}
```
在回调函数中,`messageArrived` 函数会在收到消息时被调用。在这个函数中,你可以编写处理消息的逻辑。
最后,你需要订阅一个或多个主题来等待消息的到来。你可以使用以下代码来订阅一个主题:
```java
String topic = "my/topic"; // 订阅的主题
int qos = 1; // 消息质量等级,0 表示最多一次,1 表示至少一次,2 表示恰好一次
client.subscribe(topic, qos);
```
完整示例代码:
```java
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSubscriber {
public static void main(String[] args) throws MqttException {
String brokerUrl = "tcp://localhost:1883"; // MQTT 服务器地址
String clientId = "clientId"; // 客户端 ID,可以是任何字符串
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
client.setCallback(new MyMqttCallback()); // 设置回调函数
client.connect(); // 连接到 MQTT 服务器
String topic = "my/topic"; // 订阅的主题
int qos = 1; // 消息质量等级,0 表示最多一次,1 表示至少一次,2 表示恰好一次
client.subscribe(topic, qos); // 订阅主题
}
public static class MyMqttCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
// 处理连接丢失的情况
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 处理收到的消息
System.out.println("Received message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 处理消息发送完成的情况
}
}
}
```
阅读全文