别这样写,使用@RabbitListener注解
时间: 2023-08-04 11:21:54 浏览: 51
好的,使用 `@RabbitListener` 注解可以更加简洁和方便地实现 RabbitMQ 广播轮询消费。下面是使用 `@RabbitListener` 注解的示例代码:
```python
import pika
import time
from threading import Thread
from functools import partial
from typing import List
from pika import PlainCredentials, ConnectionParameters
from pika.adapters.blocking_connection import BlockingChannel
def callback(channel: BlockingChannel, method, properties, body) -> None:
print(f"Received message: {body}")
def consume(exchange: str, queues: List[str], host: str, port: int, user: str, password: str) -> None:
credentials = PlainCredentials(user, password)
parameters = ConnectionParameters(host=host, port=port, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type='fanout')
for queue in queues:
channel.queue_declare(queue=queue)
channel.queue_bind(exchange=exchange, queue=queue)
channel.basic_consume(queue=queues[0], on_message_callback=partial(callback, channel))
channel.start_consuming()
if __name__ == '__main__':
exchange = 'logs'
queues = ['queue1', 'queue2', 'queue3']
host = 'localhost'
port = 5672
user = 'guest'
password = 'guest'
for queue in queues:
Thread(target=consume, args=(exchange, [queue], host, port, user, password)).start()
while True:
time.sleep(1)
```
在这段代码中,我们定义了一个 `callback` 函数来处理收到的消息,然后定义了一个 `consume` 函数来启动消费者。`consume` 函数接受一个 exchange 名称、一个队列列表、RabbitMQ 服务器的连接信息作为参数。在 `consume` 函数中,我们首先创建了一个 `BlockingConnection` 对象,并使用 `channel.exchange_declare` 方法声明了一个 fanout 类型的 exchange。然后,我们循环创建每个队列并将其绑定到 exchange 上,最后使用 `channel.basic_consume` 方法注册了一个消费者,并指定了一个回调函数 `callback` 来处理收到的消息。注意,在 `channel.basic_consume` 方法中,我们只向其中一个队列注册了消费者,这样每个消费者只会接收到部分消息,从而实现了消息的轮询消费。
在 `main` 函数中,我们创建了多个消费者线程,每个线程监听一个队列。最后,我们使用 `time.sleep` 方法来阻塞主线程,让消费者线程可以一直运行下去。
总的来说,使用 `@RabbitListener` 注解和使用 `BlockingConnection` 都可以实现 RabbitMQ 广播轮询消费,不同的方法适用于不同的场景。