rocketmq 怎么做分布式事务
时间: 2023-10-26 10:46:38 浏览: 90
RocketMQ提供了一种分布式事务解决方案,叫做“基于两阶段提交协议的分布式事务解决方案”。具体实现方式如下:
1. 发送方向MQ服务器发送消息,并进入本地事务执行阶段,此时消息处于“预提交”状态。
2. 发送方向MQ服务器发送“确认提交”的请求,如果MQ服务器收到请求,则将消息提交到MQ Broker上。
3. 发送方在本地事务执行成功后,向MQ服务器发送“确认提交”的请求,此时MQ服务器会将消息提交到MQ Broker上。
4. 如果发送方在本地事务执行期间发生异常,会向MQ服务器发送“确认回滚”的请求,此时MQ服务器会将消息从MQ Broker上删除。
5. 如果发送方在本地事务执行成功后,向MQ服务器发送“确认提交”的请求失败,则MQ服务器会将消息从MQ Broker上删除。
通过以上方式,RocketMQ实现了分布式事务的一致性和可靠性。
相关问题
rocketmq 做分布式事务代码
RocketMQ 提供了事务消息的支持,可以实现分布式事务。
下面是一个使用 RocketMQ 实现分布式事务的示例代码:
```java
public class TransactionalProducer {
private TransactionMQProducer producer;
private String nameServerAddr;
public TransactionalProducer(String nameServerAddr) {
this.nameServerAddr = nameServerAddr;
this.producer = new TransactionMQProducer("transactional_producer_group");
this.producer.setNamesrvAddr(nameServerAddr);
this.producer.setTransactionListener(new TransactionListenerImpl());
}
public void start() throws MQClientException {
this.producer.start();
}
public void shutdown() {
this.producer.shutdown();
}
public SendResult sendMessageInTransaction(Message message, Object argument) throws MQClientException {
return this.producer.sendMessageInTransaction(message, argument);
}
private static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (status != null) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
```
该示例代码中,TransactionalProducer 类实现了 RocketMQ 的事务消息生产者,其中 setTransactionListener 方法设置了事务监听器,实现了 executeLocalTransaction 和 checkLocalTransaction 两个方法。
executeLocalTransaction 方法是执行本地事务的方法,根据业务逻辑实现该方法,返回 LocalTransactionState 枚举类型的值,表示事务状态。
checkLocalTransaction 方法是检查本地事务的状态的方法,根据本地事务执行的结果,实现该方法,返回 LocalTransactionState 枚举类型的值,表示事务状态。
在 sendMessageInTransaction 方法中,调用了 sendMessageInTransaction 方法来发送事务消息。在消息发送成功后,根据业务逻辑执行本地事务,如果本地事务执行成功,则返回 LocalTransactionState.COMMIT_MESSAGE,否则返回 LocalTransactionState.ROLLBACK_MESSAGE。
RocketMQ 会调用 checkLocalTransaction 方法来检查本地事务的状态,根据返回的值来确定是否提交或回滚事务消息。
阅读全文