springboot中RocketMQ订阅同一个主题消息有序消费 代码
时间: 2024-02-24 21:00:15 浏览: 98
在Spring Boot中使用RocketMQ订阅同一个主题消息有序消费的代码示例:
1. 消息生产者
```java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderProducer {
@Autowired
private DefaultMQProducer defaultMQProducer;
public void sendMsg(String msg, String tag) throws Exception {
Message message = new Message("OrderTopic", tag, msg.getBytes());
defaultMQProducer.send(message);
}
}
```
在消息生产者中,我们使用DefaultMQProducer发送消息。
2. 消息消费者
```java
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderlyContext;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class OrderConsumer implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt messageExt : msgs) {
String msg = new String(messageExt.getBody());
System.out.println("消费者接收到消息:" + msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
```
在消息消费者中,我们实现了MessageListenerOrderly接口,保证同一个消费者实例中的消息按照顺序进行消费。同时,我们将consumeOrderly属性设置为true,保证同一个消费者实例中的消息按照顺序进行消费。
3. 配置文件
```yaml
spring:
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: OrderProducerGroup
consumer:
group: OrderConsumerGroup
topic: OrderTopic
tag: *
consume-thread-max: 10
consume-orderly: true
```
在配置文件中,我们将消费者的consumeOrderly属性设置为true,保证同一个消费者实例中的消息按照顺序进行消费。
需要注意的是,订阅同一个主题的消息有序消费是针对同一个消费者实例的。如果有多个消费者实例同时订阅了同一个主题,那么消息的顺序无法得到保证。如果需要多个消费者实例同时订阅同一个主题的消息,并且保证消息的顺序,可以使用MessageListenerOrderly接口作为消息监听器,保证同一个消费者按照顺序消费同一个Message Queue中的消息。
阅读全文