rocketmq MessageExt 使用案例
时间: 2024-09-11 10:10:46 浏览: 51
测试rocketmq简单的使用
RocketMQ 是阿里巴巴开源的一个分布式消息中间件,MessageExt 是 RocketMQ 中的消息实体类,它是实际传递的数据结构。在 Java 中,你可以使用 `MessageExt` 来构建、发送和接收消息。
以下是一个简单的使用案例:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class RocketMQConsumerExample {
private static final String GROUP_ID = "your_group_id";
private static final String QUEUE_NAME = "your_queue_name";
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_ID);
try {
// 设置消费者属性
consumer.setNamesrvAddr("localhost:9876"); // RocketMQ broker地址
consumer.subscribe(QUEUE_NAME, "*"); // 订阅主题
// 开始消费
consumer.start();
while (true) {
Message msg = consumer.pull(); // 拉取消息
if (msg != null) {
System.out.printf("Received message from topic %s: %s%n",
msg.getTopic(), new String(msg.getBody()));
// 处理接收到的消息
consumer.commitOffset(msg); // 提交偏移量,确认已处理
} else {
break; // 如果没有新消息,退出循环
}
}
} catch (MQClientException | RemotingException e) {
e.printStackTrace();
} finally {
consumer.shutdown(); // 关闭消费者
}
}
}
```
在这个例子中,我们创建了一个 `DefaultMQPushConsumer` 实例,设置了服务器地址,订阅了指定的主题,然后开始拉取消息并进行处理。当消息被消费后,我们会提交偏移量以表示已经处理过。
阅读全文