RocketMQ延时消息源码
时间: 2023-10-12 13:29:33 浏览: 65
RocketMQ的延时消息实现是基于timerWheel的算法,主要包括以下几个步骤:
1. 发送延时消息时,将消息的过期时间与当前时间进行比较,如果过期时间比当前时间早,则直接丢弃消息。
2. 如果消息没有过期,则计算该消息应该被放置在timerWheel的哪个槽中,这个槽的位置是通过将消息的过期时间与timerWheel的时间轮进行计算得出的。
3. 将消息放置在相应的槽中,如果该槽中已经存在消息,则将新的消息插入到该槽中的链表的尾部。
4. 启动一个后台线程,定时遍历timerWheel,查找到期的消息,并将这些消息发送到对应的消费者。
以下是RocketMQ的延时消息源码实现:
1. 发送延时消息时,将消息的过期时间与当前时间进行比较,如果过期时间比当前时间早,则直接丢弃消息。
```
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long maxTimeout = System.currentTimeMillis() + timeout;
if (msg.getDelayTimeLevel() > 0) {
long delayTime = calculateDelayTime(msg.getDelayTimeLevel());
if ((msg.getBornTimestamp() + delayTime) < maxTimeout) {
msg.setDelayTimeLevel(0);
}
}
//...省略其他代码
}
```
2. 如果消息没有过期,则计算该消息应该被放置在timerWheel的哪个槽中,这个槽的位置是通过将消息的过期时间与timerWheel的时间轮进行计算得出的。
```
private static long calculateDelayTimeLevel(long delayTime) {
for (int i = 0; i < DELAY_LEVEL.length; i++) {
if (delayTime < DELAY_LEVEL[i]) {
return i;
}
}
return DELAY_LEVEL.length - 1;
}
private static long calculateDelayTime(long level) {
return DELAY_LEVEL[(int) Math.min(level, DELAY_LEVEL.length - 1)];
}
private long computeDeliverTimestamp(final long startDeliverTime, final long delayTimeLevel) {
long offset = DELAY_LEVEL[(int) delayTimeLevel];
return startDeliverTime + offset;
}
```
3. 将消息放置在相应的槽中,如果该槽中已经存在消息,则将新的消息插入到该槽中的链表的尾部。
```
public void scheduleMessage(MessageExt msg) {
long now = System.currentTimeMillis();
long deliverTimestamp = msg.getStoreTimestamp() + msg.getDelayTimeLevel();
long delay = deliverTimestamp - now;
if (delay < 0) {
delay = 0;
}
if (delay > config.getMaxDelayTime()) {
delay = config.getMaxDelayTime();
}
int index = (int) (delay / config.getTickTime());
long expireTime = deliverTimestamp - (deliverTimestamp % config.getTickTime());
try {
lock.writeLock().lockInterruptibly();
TimerTaskList taskList = timerWheel[index];
if (taskList == null) {
taskList = new TimerTaskList(expireTime);
timerWheel[index] = taskList;
}
taskList.add(msg);
} catch (InterruptedException e) {
log.error("scheduleMessage error, ", e);
} finally {
lock.writeLock().unlock();
}
}
```
4. 启动一个后台线程,定时遍历timerWheel,查找到期的消息,并将这些消息发送到对应的消费者。
```
private class TimerTask implements Runnable {
private int index;
private long currentTime;
public TimerTask(int index, long currentTime) {
this.index = index;
this.currentTime = currentTime;
}
@Override
public void run() {
try {
lock.writeLock().lockInterruptibly();
TimerTaskList taskList = timerWheel[index];
if (taskList != null) {
List<MessageExt> messages = taskList.remove(currentTime);
if (messages != null && !messages.isEmpty()) {
putMessagesInQueue(messages);
}
}
} catch (InterruptedException e) {
log.error("TimerTask error, ", e);
} finally {
lock.writeLock().unlock();
}
}
}
private class TimerThread implements Runnable {
private volatile boolean stop = false;
@Override
public void run() {
long lastExpireTime = 0;
while (!stop) {
long currentTime = System.currentTimeMillis();
if (currentTime > lastExpireTime) {
for (int i = 0; i < config.getWheelSize(); i++) {
long expireTime = lastExpireTime + (i + 1) * config.getTickTime();
if (expireTime <= currentTime) {
executor.execute(new TimerTask(i, expireTime));
}
}
lastExpireTime = currentTime;
} else {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
log.error("TimerThread error, ", e);
}
}
}
}
}
```
相关推荐
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)