RocketMQ延时消息实现
时间: 2024-05-17 18:16:25 浏览: 138
RocketMQ 的延时消息实现是通过消息的延迟级别(delay level)来实现的。延迟级别是在消息发送时指定的一个参数,它表示消息需要延迟的时间。RocketMQ 支持 18 个延迟级别,对应的延迟时间从 1 秒到 2 个半小时不等。
当一条延时消息被发送到 Broker 后,Broker 会将该消息保存到对应的延迟队列中,等待延迟时间到达后再将该消息发送给消费者。在发送消息时,可以通过设置消息的延迟级别来实现延时投递。
需要注意的是,延时消息的实现依赖于 Broker 的定时任务,因此在使用延时消息时需要确保 Broker 的定时任务正常运行。另外,由于延时队列占用了 Broker 的存储空间,因此需要根据实际需求合理设置延迟级别的个数和延迟时间的大小。
相关问题
rocketmq延时消息
RocketMQ支持延时消息,可以通过设置消息的延时级别(Delay Level)来指定消息的延时时间。延时级别是通过设置消息的属性来实现的。
在RocketMQ中,延时消息的实现原理是通过将消息发送到延时消息队列(Delay Queue)中,在指定的延时时间后再将消息从延时消息队列中取出,发送到目标消息队列中。
延时消息可以用于各种场景,比如定时任务、订单超时提醒等。
下面是使用RocketMQ发送延时消息的示例代码:
```
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者实例
producer.start();
// 创建消息实例,设置消息内容
Message msg = new Message("topic_name", "tag_name", "Hello RocketMQ".getBytes());
// 设置消息的延时级别为3,表示延时10s发送
msg.setDelayTimeLevel(3);
// 发送消息
SendResult result = producer.send(msg);
// 输出发送结果
System.out.println(result);
// 关闭生产者实例
producer.shutdown();
```
在这个示例中,我们创建了一个生产者实例,并设置了NameServer地址。然后创建一个消息实例,设置消息内容,并将延时级别设置为3。最后发送消息,并输出发送结果。
需要注意的是,延时消息的延时时间是在消息发送后计算的,而不是在消息创建时计算的。因此,如果需要精确控制延时时间,需要考虑网络延迟等因素。
RocketMQ延时消息源码
RocketMQ的延时消息实现是基于timerWheel的算法,主要包括以下几个步骤:
1. 发送延时消息时,将消息的过期时间与当前时间进行比较,如果过期时间比当前时间早,则直接丢弃消息。
2. 如果消息没有过期,则计算该消息应该被放置在timerWheel的哪个槽中,这个槽的位置是通过将消息的过期时间与timerWheel的时间轮进行计算得出的。
3. 将消息放置在相应的槽中,如果该槽中已经存在消息,则将新的消息插入到该槽中的链表的尾部。
4. 启动一个后台线程,定时遍历timerWheel,查找到期的消息,并将这些消息发送到对应的消费者。
以下是RocketMQ的延时消息源码实现:
1. 发送延时消息时,将消息的过期时间与当前时间进行比较,如果过期时间比当前时间早,则直接丢弃消息。
```
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long maxTimeout = System.currentTimeMillis() + timeout;
if (msg.getDelayTimeLevel() > 0) {
long delayTime = calculateDelayTime(msg.getDelayTimeLevel());
if ((msg.getBornTimestamp() + delayTime) < maxTimeout) {
msg.setDelayTimeLevel(0);
}
}
//...省略其他代码
}
```
2. 如果消息没有过期,则计算该消息应该被放置在timerWheel的哪个槽中,这个槽的位置是通过将消息的过期时间与timerWheel的时间轮进行计算得出的。
```
private static long calculateDelayTimeLevel(long delayTime) {
for (int i = 0; i < DELAY_LEVEL.length; i++) {
if (delayTime < DELAY_LEVEL[i]) {
return i;
}
}
return DELAY_LEVEL.length - 1;
}
private static long calculateDelayTime(long level) {
return DELAY_LEVEL[(int) Math.min(level, DELAY_LEVEL.length - 1)];
}
private long computeDeliverTimestamp(final long startDeliverTime, final long delayTimeLevel) {
long offset = DELAY_LEVEL[(int) delayTimeLevel];
return startDeliverTime + offset;
}
```
3. 将消息放置在相应的槽中,如果该槽中已经存在消息,则将新的消息插入到该槽中的链表的尾部。
```
public void scheduleMessage(MessageExt msg) {
long now = System.currentTimeMillis();
long deliverTimestamp = msg.getStoreTimestamp() + msg.getDelayTimeLevel();
long delay = deliverTimestamp - now;
if (delay < 0) {
delay = 0;
}
if (delay > config.getMaxDelayTime()) {
delay = config.getMaxDelayTime();
}
int index = (int) (delay / config.getTickTime());
long expireTime = deliverTimestamp - (deliverTimestamp % config.getTickTime());
try {
lock.writeLock().lockInterruptibly();
TimerTaskList taskList = timerWheel[index];
if (taskList == null) {
taskList = new TimerTaskList(expireTime);
timerWheel[index] = taskList;
}
taskList.add(msg);
} catch (InterruptedException e) {
log.error("scheduleMessage error, ", e);
} finally {
lock.writeLock().unlock();
}
}
```
4. 启动一个后台线程,定时遍历timerWheel,查找到期的消息,并将这些消息发送到对应的消费者。
```
private class TimerTask implements Runnable {
private int index;
private long currentTime;
public TimerTask(int index, long currentTime) {
this.index = index;
this.currentTime = currentTime;
}
@Override
public void run() {
try {
lock.writeLock().lockInterruptibly();
TimerTaskList taskList = timerWheel[index];
if (taskList != null) {
List<MessageExt> messages = taskList.remove(currentTime);
if (messages != null && !messages.isEmpty()) {
putMessagesInQueue(messages);
}
}
} catch (InterruptedException e) {
log.error("TimerTask error, ", e);
} finally {
lock.writeLock().unlock();
}
}
}
private class TimerThread implements Runnable {
private volatile boolean stop = false;
@Override
public void run() {
long lastExpireTime = 0;
while (!stop) {
long currentTime = System.currentTimeMillis();
if (currentTime > lastExpireTime) {
for (int i = 0; i < config.getWheelSize(); i++) {
long expireTime = lastExpireTime + (i + 1) * config.getTickTime();
if (expireTime <= currentTime) {
executor.execute(new TimerTask(i, expireTime));
}
}
lastExpireTime = currentTime;
} else {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
log.error("TimerThread error, ", e);
}
}
}
}
}
```
阅读全文