用pycham编写3. Kafka消费者订阅分区 在命令行窗口中启动Kafka后,手动创建主题“assgin tonic”分区就号*2。具体命令1下: .bin\windows kkafka-topics.bat --create--zo0keeper loca 1heet.210renlicatiofEactor 1 --partitions 2 --topic assign_topic 根据上面给出的主题,完成如下操作。 (1)编写生产者程序,以通用唯一标识符UUID作为消息,发送给主题“assign topic”"。 (2)编写消费者程子1,阅主题的分区0,只消费分区0数据。 (3)编写消费者程了2,阅土题的分区1,只消费分区1数据。
时间: 2024-02-06 13:12:31 浏览: 133
好的,我来回答你的问题。
首先,你需要安装kafka-python包。可以使用以下命令在命令行窗口中安装:
```
pip install kafka-python
```
接下来,我们编写生产者程序,以通用唯一标识符UUID作为消息,发送给主题“assign_topic”。代码如下:
```python
from kafka import KafkaProducer
import uuid
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
for i in range(10):
message = str(uuid.uuid4())
producer.send('assign_topic', message.encode('utf-8'))
producer.close()
```
这段代码会向“assign_topic”主题发送10条消息,每条消息都是一个唯一标识符UUID。
然后,我们编写消费者程序1,订阅主题的分区0,只消费分区0的数据。代码如下:
```python
from kafka import KafkaConsumer
consumer = KafkaConsumer('assign_topic', bootstrap_servers=['localhost:9092'], partition_assignment_strategy=[0])
for message in consumer:
print(message)
consumer.close()
```
这段代码会订阅“assign_topic”主题的分区0,并且只消费分区0的数据。程序会把接收到的消息打印出来。
最后,我们编写消费者程序2,订阅主题的分区1,只消费分区1的数据。代码如下:
```python
from kafka import KafkaConsumer
consumer = KafkaConsumer('assign_topic', bootstrap_servers=['localhost:9092'], partition_assignment_strategy=[1])
for message in consumer:
print(message)
consumer.close()
```
这段代码会订阅“assign_topic”主题的分区1,并且只消费分区1的数据。程序会把接收到的消息打印出来。
注意,这里使用了`partition_assignment_strategy`参数来指定消费者订阅的分区。这样就可以控制消费者只消费特定的分区数据了。
阅读全文