RockMQ同步消费模式,代码示例
时间: 2023-05-27 07:03:40 浏览: 99
rocketmq实例代码
4星 · 用户满意度95%
以下是一个RockMQ的同步消费模式的Java代码示例:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class SyncConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化一个消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");
// 设置nameserver的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个消费主题
consumer.subscribe("example_topic", "*");
// 注册消费者监听器,处理消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
// 返回消息消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
System.out.println("Consumer started");
// 等待30秒,应用程序会退出
Thread.sleep(30000);
// 关闭消费者
consumer.shutdown();
}
}
```
此代码示例创建了一个同步消费者,当消息到达时立即处理它们。它订阅了一个名为“example_topic”的主题,将消息打印到控制台。然后代码示例启动消费者并等待30秒,然后停止消费者。
阅读全文