请使用confluent_kafka,生产者和消费者的python3代码,生产者函数指定输入字典,消费者函数返回这个字典
时间: 2024-03-05 17:50:08 浏览: 183
好的,以下是使用confluent_kafka库编写的生产者和消费者代码,生产者可以输入一个字典并将其转换成json字符串后发送,消费者可以接收到消息并解析出字典并进行返回。
```python
from confluent_kafka import Producer, Consumer, KafkaError
import json
# kafka配置信息
conf = {
'bootstrap.servers': 'localhost:9092', # kafka broker地址
'client.id': 'producer',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
}
# 创建生产者对象
producer = Producer(conf)
# 创建消费者对象
consumer = Consumer(conf)
# 发送消息到指定主题
def send_message(topic, message):
producer.produce(topic, value=json.dumps(message))
producer.flush()
# 从指定主题消费消息并返回
def consume_message(topic):
# 订阅指定主题
consumer.subscribe([topic])
# 消费消息
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'.format(msg.topic(), msg.partition()))
else:
print('Error occured: {0}'.format(msg.error().str()))
else:
# 解析出字典并返回
data = json.loads(msg.value().decode('utf-8'))
return data
```
注意事项:
1. 在使用时需要先安装confluent_kafka库,可以通过pip install confluent_kafka命令进行安装。
2. 在发送和接收消息时需要指定相同的主题名称。
3. 在发送消息时需要将字典转换成json字符串进行发送,接收消息后需要将json字符串解析成字典进行操作。
4. 在调用consume_message函数时需要使用while循环不断消费消息,可以通过其他方式来停止循环。
阅读全文