有两个生产者:偶数生产者及奇数生产者,其中偶数生产者负责产生序列0,2,4,6,8…,奇数生产者负责产生1,3,5,7…。数据收集者负责打包一个数据,每个数据由一个偶数和一个奇数构成,并且一个数据中的奇偶数相差1,比如0和1构成一个数据,2和3构成一个数据。因此数据收集者必须负责等待当前轮次的奇偶数据都到了,才能产生一个数据,并放入缓存队列。消费者每次取出一个数据,并打印。偶数生产者和奇数生产者每次产生一个数后随机休眠(sleep)一段时间,范围是:100ms-150ms。
时间: 2024-02-25 11:55:42 浏览: 238
以下一个实现上述功能的 Python 代码:
```python
import threading
import time
import random
# 缓存队列
queue = []
lock = threading.Lock()
# 偶数生产者
def even_producer():
global queue
for i in range(0, 100, 2):
time.sleep(random.uniform(0.1, 0.15))
with lock:
queue.append(i)
print(f"偶数生产者生产了数据 {i}")
# 奇数生产者
def odd_producer():
global queue
for i in range(1, 100, 2):
time.sleep(random.uniform(0.1, 0.15))
with lock:
queue.append(i)
print(f"奇数生产者生产了数据 {i}")
# 数据收集者
def collector():
global queue
while True:
with lock:
if len(queue) < 2:
time.sleep(0.01)
continue
even = queue.pop(0)
odd = None
for i in range(len(queue)):
if abs(queue[i] - even) == 1:
odd = queue.pop(i)
break
if odd is not None:
data = (even, odd)
print(f"数据收集者收集了一组数据 {data}")
queue.append(data)
else:
queue.append(even)
# 消费者
def consumer():
global queue
while True:
with lock:
if len(queue) == 0:
time.sleep(0.01)
continue
data = queue.pop(0)
print(f"消费者消费了数据 {data}")
if isinstance(data, tuple):
for item in data:
queue.append(item)
time.sleep(0.1)
# 创建生产者、数据收集者和消费者线程
ep = threading.Thread(target=even_producer)
op = threading.Thread(target=odd_producer)
c = threading.Thread(target=collector)
cons = [threading.Thread(target=consumer) for _ in range(5)]
# 启动线程
ep.start()
op.start()
c.start()
for con in cons:
con.start()
# 等待线程结束
ep.join()
op.join()
c.join()
for con in cons:
con.join()
```
在上述代码中,我们使用了一个列表`queue`来作为缓存队列,其中存储的是偶数和奇数构成的数据。偶数生产者每次产生一个偶数,奇数生产者每次产生一个奇数,两个生产者都会随机休眠一段时间。数据收集者会等待当前轮次的奇偶数据都到了,才会将它们打包成一个数据,并放入缓存队列。消费者会从缓存队列中取出一个数据,并打印。如果取出的数据是一个数据对,则会将它们分别放回缓存队列中。
为了提高程序的并发性,我们使用了一个`Lock`对象来保证对缓存队列的操作是线程安全的。同时,我们创建了5个消费者线程来消费数据,以提高程序的吞吐量。
需要注意的是,在这个实现中,我们假设了数据收集者只会等待一轮偶数和奇数数据就能找到匹配的数据对,实际上这个假设可能不成立。如果数据收集者等待了太长时间,缓存队列中的数据可能会变得很大,从而导致程序的性能下降。为了解决这个问题,需要考虑更复杂的算法,例如用哈希表来保存奇偶数之间的关系,或者限制缓存队列的大小。
阅读全文