rocketmq 如何保证订单的创建、支付成功顺序消费 代码实现
时间: 2023-08-22 21:35:36 浏览: 76
基于SpringBoot Dubbo RocketMQ的订单支付系统源码+项目说明+数据库.zip
在RocketMQ中,可以通过使用顺序消息来保证订单的创建、支付成功的顺序消费。具体实现步骤如下:
1. 创建顺序消息生产者
```java
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
```
2. 发送订单创建消息
```java
Message message = new Message("order_topic", "order_create_tag", "order_id_001", "order_create_message".getBytes());
// 设置顺序消息的业务ID,用来保证消息发送的顺序
message.setKeys("order_id_001");
// 发送顺序消息
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
// 获取订单ID
String orderId = (String) o;
// 计算订单ID的hash值
int index = Math.abs(orderId.hashCode()) % list.size();
// 选择消息队列
return list.get(index);
}
}, "order_id_001");
```
3. 发送订单支付成功消息
```java
Message message = new Message("order_topic", "order_pay_success_tag", "order_id_001", "order_pay_success_message".getBytes());
// 设置顺序消息的业务ID,用来保证消息发送的顺序
message.setKeys("order_id_001");
// 发送顺序消息
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
// 获取订单ID
String orderId = (String) o;
// 计算订单ID的hash值
int index = Math.abs(orderId.hashCode()) % list.size();
// 选择消息队列
return list.get(index);
}
}, "order_id_001");
```
在订单创建消息和订单支付成功消息中,我们都需要设置相同的业务ID,并使用`MessageQueueSelector`选择同一个消息队列,这样才能保证同一个订单的消息被发送到同一个消息队列中。
4. 创建顺序消息消费者
```java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order_topic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
// 消费消息
for (MessageExt messageExt : list) {
if ("order_create_tag".equals(messageExt.getTags())) {
System.out.println("订单创建消息:" + new String(messageExt.getBody()));
} else if ("order_pay_success_tag".equals(messageExt.getTags())) {
System.out.println("订单支付成功消息:" + new String(messageExt.getBody()));
}
}
// 返回消费状态
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
```
在订单消息的消费端,我们需要注册`MessageListenerOrderly`监听器,按照顺序消费消息。在消费消息时,我们需要根据消息的Tag进行判断,如果是订单创建消息,则执行订单创建逻辑;如果是订单支付成功消息,则执行订单支付成功逻辑。
需要注意的是,为了保证订单的创建、支付成功顺序消费,我们需要保证同一个订单的消息被发送到同一个消息队列中,这样才能保证消息的消费顺序。
阅读全文