1. 根据上面给出的主题,完成如下操作: (1)编写生产者程序,以通用唯一标识符UUID作为消息,发送给主题assign_topic; (2)编写消费者程序1,订阅主题的分区0,只消费分区0数据; (3)编写消费者程序2,订阅主题的分区1,只消费分区1数据;
时间: 2023-12-13 09:05:38 浏览: 118
利用单线程完成的生产者与消费者程序
好的,我了解了你的问题。针对你的需求,我可以给出以下的代码实现:
生产者程序:
```python
from kafka import KafkaProducer
import uuid
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
for i in range(10):
msg = str(uuid.uuid4()).encode('utf-8')
producer.send('assign_topic', msg)
```
消费者程序1:
```python
from kafka import KafkaConsumer
consumer = KafkaConsumer('assign_topic', bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', partition_assignment=[{'topic': 'assign_topic', 'partition': 0}])
for message in consumer:
print("消费者1消费消息: {}".format(message.value.decode('utf-8')))
```
消费者程序2:
```python
from kafka import KafkaConsumer
consumer = KafkaConsumer('assign_topic', bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', partition_assignment=[{'topic': 'assign_topic', 'partition': 1}])
for message in consumer:
print("消费者2消费消息: {}".format(message.value.decode('utf-8')))
```
这些代码使用了 Python 中的 Kafka 库,可以通过 pip 安装。在这里,我们使用了 `uuid` 库来生成消息的唯一标识符。生产者程序不断地发送消息,而两个消费者程序分别消费了主题的不同分区的数据。注意,消费者程序需要指定消费哪个主题、哪个分区,并且需要指定 `auto_offset_reset` 参数为 `earliest`,以便消费者能够消费旧有的消息。
阅读全文