请将他们包装成两个函数
时间: 2023-11-27 08:03:59 浏览: 61
c++11的万能函数包装器和消息循环demo
好的,以下是将生产者和消费者代码封装成函数的示例代码:
```python
from confluent_kafka import Producer, Consumer, KafkaError
import json
# kafka配置信息
conf = {
'bootstrap.servers': 'localhost:9092', # kafka broker地址
'client.id': 'producer',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
}
# 创建生产者对象
producer = Producer(conf)
# 创建消费者对象
consumer = Consumer(conf)
# 发送消息到指定主题
def send_message(topic, message):
producer.produce(topic, value=json.dumps(message))
producer.flush()
# 订阅指定主题并消费消息
def consume_message(topic):
# 订阅指定主题
consumer.subscribe([topic])
# 消费消息
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'.format(msg.topic(), msg.partition()))
else:
print('Error occured: {0}'.format(msg.error().str()))
else:
# 解析出字典并输出
data = json.loads(msg.value().decode('utf-8'))
print(data)
```
注意事项:
1. 在使用时需要先安装confluent_kafka库,可以通过pip install confluent_kafka命令进行安装。
2. 在发送和接收消息时需要指定相同的主题名称。
3. 在发送消息时需要将字典转换成json字符串进行发送,接收消息后需要将json字符串解析成字典进行操作。
4. 在调用consume_message函数时需要使用while循环不断消费消息,可以通过其他方式来停止循环。
阅读全文