单机时可以使用 map ConcurrentHashMap -> putIfAbsent guava cache
Redis
分布式锁搞起来。
11、 如何让 RocketMQ 保证消息的顺序消费
你们线上业务用消息中间件的时候,是否需要保证消息的顺序性?
如果不需要保证消息顺序,为什么不需要?假如我有一个场景要保证消息的顺序,你们应该
如何保证?
首先多个 queue 只能保证单个 queue 里的顺序,queue 是典型的 FIFO,天然顺序。多个 queue
同时消费是无法绝对保证消息的有序性的。所以总结如下:
同一 topic,同一个 QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消
费一个 queue 里的消息。
追问:怎么保证消息发到同一个 queue?
Rocket MQ 给我们提供了 MessageQueueSelector 接口,可以自己重写里面的接口,实现自己
的算法,举个最简单的例子:判断 i % 2 == 0 ,那就都放到 queue1 里,否则放到 queue2
里。
for (int i = 0; i < 5; i++) {
Message message = new Message("orderTopic", ("hello!" +i).getBytes());
producer.send(
//要发的那条消息
message,
//queue 选择器 ,向 topic 中的哪个 queue 去写消息
newMessageQueueSelector() {
//手动 选择一个 queue
@Override
publicMessageQueue select(
//当前 topic 里面包含的所有 queue
List<MessageQueue>mqs,
//具体要发的那条消息
Message msg,
//对应到 send() 里的 args,也就是 2000 前面的那个 0
Object arg) {
//向固定的一个 queue 里写消息,比如这里就是向第一个 queue 里写消息
if (Integer.parseInt(arg.toString()) % 2 == 0) {
return mqs.get(0);
} else{
return mqs.get(1);
}
评论0