rocketmq顺序消费模式,生产者代码,消费者代码
时间: 2023-05-27 19:04:30 浏览: 77
1. RocketMQ顺序消费模式生产者代码:
```
// 创建Producer实例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置name server的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer
producer.start();
// 创建Message
String topic = "topic_name";
String messageBody = "message_body";
Message message = new Message(topic, messageBody.getBytes());
// 设置顺序消息的Tag和Key
message.setTags("message_tag");
message.setKeys("message_key");
// 发送顺序消息,第三个参数为顺序消息的顺序Key
SendResult sendResult = producer.send(message, (mqs, msg, arg) -> {
int index = (int) arg;
int queueNum = mqs.size();
// 根据顺序Key获取消息队列索引
int queueIndex = index % queueNum;
return mqs.get(queueIndex);
}, 0);
// 输出发送结果
System.out.println(sendResult);
// 关闭Producer
producer.shutdown();
```
2. RocketMQ顺序消费模式消费者代码:
```
// 创建Consumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置name server的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题和Tag
String topic = "topic_name";
consumer.subscribe(topic, "message_tag");
// 设置顺序消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册消息监听器
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
// 消息处理逻辑
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
// 消费成功,返回成功标志
return ConsumeOrderlyStatus.SUCCESS;
});
// 启动Consumer
consumer.start();
// 等待消费结束
Thread.sleep(60000);
// 关闭Consumer
consumer.shutdown();
```
相关推荐
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)