给我写个rabbitmq 交换机类型为headers的消息提供者和消费者的案例
时间: 2024-03-10 11:46:04 浏览: 140
RabbitMQ与SpringBoot整合.docx
好的,以下是一个使用Header Exchange的RabbitMQ消息生产者和消费者的案例。
生产者代码:
```python
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建一个headers类型的Exchange
channel.exchange_declare(exchange='my_headers_exchange', exchange_type='headers')
# 生产者发送消息到Exchange
message = 'Hello World!'
headers = {'color': 'blue', 'size': 'small'}
channel.basic_publish(exchange='my_headers_exchange', routing_key='', body=message, properties=pika.BasicProperties(headers=headers))
print("Sent message: %r" % message)
# 关闭连接
connection.close()
```
消费者代码:
```python
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建一个headers类型的Exchange
channel.exchange_declare(exchange='my_headers_exchange', exchange_type='headers')
# 创建一个随机队列,并将它绑定到Exchange
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
headers = {'color': 'blue', 'size': 'small'}
channel.queue_bind(exchange='my_headers_exchange', queue=queue_name, arguments=headers)
print('Waiting for messages...')
# 定义一个回调函数来处理接收到的消息
def callback(ch, method, properties, body):
print("Received message: %r" % body)
# 开始消费消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
```
在这个例子中,我们创建了一个名为`my_headers_exchange`的Header Exchange,并使用`color`和`size`两个键值对来进行消息的路由。生产者发送了一条包含`Hello World!`消息体和`{'color': 'blue', 'size': 'small'}`键值对的Header的消息到Exchange。消费者创建了一个随机队列,并将它绑定到Exchange上,同时指定了`{'color': 'blue', 'size': 'small'}`作为绑定参数。当生产者发送的消息的Header与绑定参数匹配时,消息将被路由到该队列中,并由消费者进行处理。
阅读全文