from kafka import KafkaProducer, KafkaConsumer from kafka.errors import kafka_errorsimport traceback import json def producer_demo(): Pass def consumer_demo(): Pass 具体执行效果如下: 先执行消费者: consumer_demo() 再执行生产者: producer_demo() 会看到如下输出: >>> producer_demo() send 0 send 1 send 2 >>> consumer_demo() receive, key: count_num, value: 0 receive, key: count_num, value: 1 receive, key: count_num, value: 2
时间: 2024-03-22 08:37:35 浏览: 64
好的,根据您提供的代码,这是一个使用Kafka实现消息生产者和消费者的Demo。代码中的producer_demo和consumer_demo函数目前为空,需要我们填充代码来实现生产者和消费者的功能。
下面是完整代码:
```python
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
# 生产者函数
def producer_demo():
try:
# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# 发送三条消息
for i in range(3):
data = {'count_num': i}
# 转换为JSON格式并发送
producer.send('test', json.dumps(data).encode('utf-8'))
print("send", i)
# 关闭连接
producer.close()
except kafka_errors.KafkaError:
traceback.print_exc()
# 消费者函数
def consumer_demo():
try:
# 创建KafkaConsumer实例
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'])
# 循环消费消息
for msg in consumer:
# 解析消息的key和value
key = msg.key.decode('utf-8')
value = json.loads(msg.value.decode('utf-8'))
print(f"receive, key: {key}, value: {value['count_num']}")
# 关闭连接
consumer.close()
except kafka_errors.KafkaError:
traceback.print_exc()
# 先执行消费者函数
consumer_demo()
# 再执行生产者函数
producer_demo()
```
以上代码中,我们使用了KafkaProducer和KafkaConsumer模块来实现消息的生产和消费。在producer_demo函数中,我们使用KafkaProducer实例的send方法来发送三条消息,并使用json.dumps函数将数据转换为JSON格式。在consumer_demo函数中,我们使用KafkaConsumer实例来订阅'test'主题,并循环消费所有消息。在循环中,我们使用msg.key和msg.value获取消息的key和value,并使用json.loads函数将value转换为Python对象。最后,我们分别先执行consumer_demo函数,再执行producer_demo函数,可以看到输出结果符合预期。
阅读全文