生成代码:0.生产者,产生一个大于20亿的数字,消费者判断数字是否为素数1、生产者、缓存项目和消费者分别各自独立项目,以三个进程独立启动 2、选择一个消息队列服务使用(MSMQ、RocketMQ等等,备注:可自建消息队
时间: 2024-02-25 13:55:26 浏览: 23
列)。生产者将生成的大于20亿的数字发送到消息队列中,消费者从消息队列中接收数字并判断是否为素数。如果是素数,则将结果发送到另一个消息队列中,由生产者从中接收并输出结果。
以下是示例代码,使用Python的pika库实现RabbitMQ消息队列服务:
**生产者代码:**
```python
import pika
import random
# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明消息队列
channel.queue_declare(queue='number_queue')
# 生产随机数并发送到消息队列
number = random.randint(2000000000, 3000000000)
channel.basic_publish(exchange='', routing_key='number_queue', body=str(number))
print(" [x] Sent %r" % number)
# 关闭连接
connection.close()
```
**消费者代码:**
```python
import pika
import math
# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明消息队列
channel.queue_declare(queue='number_queue')
channel.queue_declare(queue='prime_queue')
# 定义判断素数的函数
def is_prime(n):
if n <= 1:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
# 定义回调函数,接收消息并判断是否为素数
def callback(ch, method, properties, body):
number = int(body)
if is_prime(number):
channel.basic_publish(exchange='', routing_key='prime_queue', body=str(number))
print(" [x] Received %r is a prime number" % number)
else:
print(" [x] Received %r is not a prime number" % number)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 限制同时只能处理一个消息
channel.basic_qos(prefetch_count=1)
# 接收消息并调用回调函数处理
channel.basic_consume(queue='number_queue', on_message_callback=callback)
# 开始接收消息并进入阻塞状态
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
```
**输出结果:**
生产者输出:
```
[x] Sent 2951416152
```
消费者输出:
```
[*] Waiting for messages. To exit press CTRL+C
[x] Received 2951416152 is not a prime number
```
如果生产者生成的数字为素数,则消费者输出为:
```
[x] Received 2147483647 is a prime number
```