Python 通过四条线程并行,两个队列接受Mq消息,接收的消息要求放到不同的队列中,两个队列发送mq消息
时间: 2023-09-18 14:10:17 浏览: 89
可以使用Python的`threading`模块来实现四条线程并行。首先需要安装`pika`包来连接MQ消息队列,然后定义两个队列用于接收消息和两个队列用于发送消息。具体代码如下:
```python
import threading
import pika
# 连接MQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 定义两个队列用于接收消息
queue_1 = 'queue_1'
channel.queue_declare(queue=queue_1)
queue_2 = 'queue_2'
channel.queue_declare(queue=queue_2)
# 定义两个队列用于发送消息
send_queue_1 = 'send_queue_1'
channel.queue_declare(queue=send_queue_1)
send_queue_2 = 'send_queue_2'
channel.queue_declare(queue=send_queue_2)
# 四条线程并行
def receive_thread_1():
def callback(ch, method, properties, body):
channel.basic_publish(exchange='', routing_key=send_queue_1, body=body)
print('Received message in queue_1:', body.decode())
channel.basic_consume(queue=queue_1, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
def receive_thread_2():
def callback(ch, method, properties, body):
channel.basic_publish(exchange='', routing_key=send_queue_2, body=body)
print('Received message in queue_2:', body.decode())
channel.basic_consume(queue=queue_2, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
def send_thread_1():
while True:
method, properties, body = channel.basic_get(send_queue_1)
if body:
channel.basic_publish(exchange='', routing_key=queue_2, body=body)
print('Sent message from send_queue_1 to queue_2:', body.decode())
def send_thread_2():
while True:
method, properties, body = channel.basic_get(send_queue_2)
if body:
channel.basic_publish(exchange='', routing_key=queue_1, body=body)
print('Sent message from send_queue_2 to queue_1:', body.decode())
# 启动四条线程
t1 = threading.Thread(target=receive_thread_1)
t2 = threading.Thread(target=receive_thread_2)
t3 = threading.Thread(target=send_thread_1)
t4 = threading.Thread(target=send_thread_2)
t1.start()
t2.start()
t3.start()
t4.start()
```
以上代码中,分别定义了四个线程,其中两个线程用于接收消息,两个线程用于发送消息。在接收消息的线程中,通过`basic_consume`方法来监听队列中的消息,当接收到消息时,通过`basic_publish`方法将消息发送到对应的发送队列中。在发送消息的线程中,通过`basic_get`方法从发送队列中取出消息,然后通过`basic_publish`方法将消息发送到另一个队列中。同时,在每次发送或接收消息时,都会打印出相应的日志,方便调试和观察。
阅读全文