rocketmq保证幂等性 用redistributionTemplate写一个代码示例
时间: 2024-02-23 17:57:51 浏览: 81
RocketMQ实例
5星 · 资源好评率100%
好的,以下是使用 `DefaultMQProducer` 和 `TransactionMQProducer` 对消息进行幂等性保障的代码示例。
1. 使用 `DefaultMQProducer`
```java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
public class IdempotentProducer {
private static final String TOPIC_NAME = "test_topic";
private static final String TAG_NAME = "test_tag";
private static final String PRODUCER_GROUP_NAME = "test_group";
public static void main(String[] args) throws Exception {
// 实例化消息生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP_NAME);
producer.setNamesrvAddr("localhost:9876");
// 启动消息生产者
producer.start();
try {
// 创建消息
Message message = new Message(TOPIC_NAME, TAG_NAME, "Hello world!".getBytes());
// 设置消息的唯一键,用于幂等性保障
message.putUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "123456");
// 发送消息
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消息生产者
producer.shutdown();
}
}
}
```
2. 使用 `TransactionMQProducer`
```java
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import java.util.List;
import java.util.concurrent.*;
public class IdempotentTransactionProducer {
private static final String TOPIC_NAME = "test_topic";
private static final String TAG_NAME = "test_tag";
private static final String PRODUCER_GROUP_NAME = "test_group";
public static void main(String[] args) throws Exception {
// 实例化事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP_NAME);
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
// 启动事务消息生产者
producer.start();
try {
// 创建消息
Message message = new Message(TOPIC_NAME, TAG_NAME, "Hello world!".getBytes());
// 发送事务消息
producer.sendMessageInTransaction(message, null);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭事务消息生产者
producer.shutdown();
}
}
/**
* 事务监听器实现类
*/
static class TransactionListenerImpl implements TransactionMQProducer.TransactionListener {
private final ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
private final ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000), r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
// 执行本地事务逻辑
// ...
// 记录本地事务状态
localTrans.put(message.getTransactionId(), 1);
// 返回本地事务状态
return LocalTransactionState.UNKNOW;
}
/**
* 检查事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 获取本地事务状态
Integer status = localTrans.get(messageExt.getTransactionId());
// 如果本地事务状态为 COMMIT_MESSAGE,则返回 COMMIT_MESSAGE
if (status != null) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 如果本地事务状态未知,则发送事务状态检查请求
MessageQueue queue = new MessageQueue();
queue.setBrokerName(messageExt.getBrokerName());
queue.setQueueId(messageExt.getQueueId());
queue.setTopic(TOPIC_NAME);
CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
requestHeader.setCommitLogOffset(messageExt.getCommitLogOffset());
requestHeader.setMsgId(messageExt.getMsgId());
requestHeader.setTransactionId(messageExt.getTransactionId());
requestHeader.setTranStateTableOffset(messageExt.getQueueOffset());
try {
CompletableFuture<LocalTransactionState> future = new CompletableFuture<>();
executorService.submit(() -> {
try {
// 发送事务状态检查请求,并等待响应结果
LocalTransactionState state = producer.checkTransactionState(requestHeader, queue);
future.complete(state);
} catch (Exception e) {
future.complete(LocalTransactionState.UNKNOW);
}
});
// 设置事务状态检查超时时间
return future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
return LocalTransactionState.UNKNOW;
} catch (InterruptedException e) {
return LocalTransactionState.UNKNOW;
} catch (ExecutionException e) {
return LocalTransactionState.UNKNOW;
}
}
}
}
```
阅读全文