rabbit mq延迟任务
时间: 2023-09-01 19:10:52 浏览: 105
RabbitMQ 支持延迟任务通过插件 rabbitmq_delayed_message_exchange 实现。该插件能够让你发送一个延迟消息,这个消息将会在指定的时间后才会被处理。下面是实现延迟任务的步骤:
1. 安装插件
首先需要安装插件,使用以下命令安装:
```bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
```
2. 创建交换机
创建一个延迟交换机,可以使用下面的命令:
```bash
rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message arguments='{"x-delayed-type":"direct"}'
```
3. 发送延迟消息
使用延迟交换机来发送消息。将消息发送到该交换机,同时设置一个 x-delay 的 header,用来指定消息的延迟时间(单位是毫秒)。
```python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建延迟交换机
channel.exchange_declare(exchange='delayed_exchange', exchange_type='x-delayed-message', arguments={
'x-delayed-type': 'direct'
})
# 发送延迟消息
delay_time = 5000 # 延迟 5 秒
message_body = 'Hello, world!'
channel.basic_publish(
exchange='delayed_exchange',
routing_key='delayed_routing_key',
body=message_body,
properties=pika.BasicProperties(
headers={'x-delay': delay_time}
)
)
print(f"Sent '{message_body}' with delay {delay_time}ms")
connection.close()
```
4. 处理延迟消息
在接收端,正常处理消息即可。当消息到达时,RabbitMQ 会将其路由到指定的队列。
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='delayed_queue', durable=True)
# 绑定队列到延迟交换机
channel.queue_bind(queue='delayed_queue', exchange='delayed_exchange', routing_key='delayed_routing_key')
# 处理消息
def callback(ch, method, properties, body):
print(f"Received: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='delayed_queue', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()
```
这样就可以实现 RabbitMQ 的延迟任务了。
阅读全文