请使用confluent_kafka,生产者和消费者的python3代码,生产者可指定输入一个字典,消费者输出这个字典
时间: 2024-03-05 07:50:05 浏览: 72
confluent-kafka-python:Confluent的Kafka Python客户端
好的,以下是使用confluent_kafka库编写的生产者和消费者代码,生产者可以输入一个字典并将其转换成json字符串后发送,消费者可以接收到消息并解析出字典并进行输出。
生产者代码:
```python
from confluent_kafka import Producer
import json
# kafka配置信息
conf = {
'bootstrap.servers': 'localhost:9092', # kafka broker地址
'client.id': 'producer'
}
producer = Producer(conf)
# 发送消息到指定主题
def send_message(topic, message):
producer.produce(topic, value=json.dumps(message))
producer.flush()
if __name__ == '__main__':
# 发送一个字典
data = {'name': '张三', 'age': 18}
send_message('test_topic', data)
```
消费者代码:
```python
from confluent_kafka import Consumer, KafkaError
import json
# kafka配置信息
conf = {
'bootstrap.servers': 'localhost:9092', # kafka broker地址
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
# 订阅指定主题
consumer.subscribe(['test_topic'])
# 消费消息
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'.format(msg.topic(), msg.partition()))
else:
print('Error occured: {0}'.format(msg.error().str()))
else:
# 解析出字典并输出
data = json.loads(msg.value().decode('utf-8'))
print(data)
```
注意事项:
1. 在使用时需要先安装confluent_kafka库,可以通过pip install confluent_kafka命令进行安装。
2. 在发送和接收消息时需要指定相同的主题名称。
3. 在发送消息时需要将字典转换成json字符串进行发送,接收消息后需要将json字符串解析成字典进行操作。
阅读全文