编写一个 RocketMQ的使用示例
时间: 2024-12-11 15:11:55 浏览: 7
RocketMQ是一个开源的企业级消息中间件,它提供了高吞吐量、低延迟、可靠的消息传输服务。以下是一个简单的RocketMQ消费者(Consumer)使用Java编写的示例,这个例子展示了如何创建消费者实例、订阅主题并接收消息:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageExt;
public class RocketMQConsumerExample {
private static final String GROUP_ID = "your_group_id"; // 消费者组ID
private static final String QUEUE_NAME = "your_queue_name"; // 消息队列名
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_ID);
consumer.setNamesrvAddr("localhost:9876"); // 设置Nimbus地址或集群地址
// 创建topic订阅
consumer.subscribe(QUEUE_NAME, "#" + GROUP_ID); // 使用通配符(*)匹配所有分区
// 开始监听消息
consumer.start();
try {
while (true) {
MessageExt msg = consumer.fetchMessage(5000); // 获取一条消息,超时时间5秒
if (msg != null) {
System.out.printf("Received message from topic %s: %s%n", msg.getTopic(), msg.getBody().toString());
// 这里处理接收到的消息
msg.release(); // 放回池,以便其他消费者使用
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
在这个示例中,你需要替换`GROUP_ID`和`QUEUE_NAME`为实际的消费者组ID和要消费的主题名称。`setNamesrvAddr`用于设置Nimbus服务器地址,确保连接到RocketMQ集群。
运行这个程序后,消费者将开始从指定主题的每个分区接收消息。当有新消息到达时,它们会被打印出来。
阅读全文