springboot+rocketmq实现代码,生产者1发送order并入库,消费者1解析order,给对应用户加1积分存入数据库,消费者2解析order,为订单生成一条物流信息, 生产者2发送用户信息,消费者3解析用户信息并入库,并说明生产者和消费者应该在何处调用
时间: 2024-05-02 19:16:26 浏览: 48
以下是Spring Boot RocketMQ实现代码:
生产者1发送order并入库:
```java
@Service
public class OrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderService orderService;
public void sendOrder(Order order) {
orderService.saveOrder(order); // 先将订单入库
rocketMQTemplate.convertAndSend("order_topic", order); // 发送订单消息
}
}
```
消费者1解析order,给对应用户加1积分存入数据库:
```java
@Service
@RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_consumer_group")
public class OrderConsumer implements RocketMQListener<Order> {
@Autowired
private UserService userService;
@Override
public void onMessage(Order order) {
User user = userService.getUserById(order.getUserId());
user.setPoint(user.getPoint() + 1);
userService.updateUser(user); // 更新用户积分
}
}
```
消费者2解析order,为订单生成一条物流信息:
```java
@Service
@RocketMQMessageListener(topic = "order_topic", consumerGroup = "order_consumer_group")
public class OrderConsumer implements RocketMQListener<Order> {
@Autowired
private LogisticsService logisticsService;
@Override
public void onMessage(Order order) {
logisticsService.generateLogisticsInfo(order); // 生成物流信息
}
}
```
生产者2发送用户信息:
```java
@Service
public class UserProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private UserService userService;
public void sendUser(User user) {
userService.saveUser(user); // 先将用户入库
rocketMQTemplate.convertAndSend("user_topic", user); // 发送用户消息
}
}
```
消费者3解析用户信息并入库:
```java
@Service
@RocketMQMessageListener(topic = "user_topic", consumerGroup = "user_consumer_group")
public class UserConsumer implements RocketMQListener<User> {
@Autowired
private UserService userService;
@Override
public void onMessage(User user) {
userService.saveUser(user); // 将用户入库
}
}
```
生产者和消费者应该在业务代码中调用。生产者在业务逻辑完成后发送消息,消费者在收到消息后执行相应的业务逻辑。可以使用RocketMQ的事务消息,保证生产者和消费者的数据一致性。
阅读全文