kafka 自定义一个json文件,根据文件,编写生产者和消费者python程序,完成发送和接收
时间: 2023-12-10 18:40:32 浏览: 100
首先,我们需要安装kafka-python库,可以使用以下命令进行安装:
```
pip install kafka-python
```
接下来,我们可以编写生产者程序,示例代码如下:
```python
from kafka import KafkaProducer
import json
# 读取json文件
with open('data.json', 'r') as f:
data = json.load(f)
# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# 发送消息
producer.send('test_topic', json.dumps(data).encode('utf-8'))
# 关闭连接
producer.close()
```
其中,我们首先使用`json`库读取一个名为`data.json`的json文件,然后创建了一个`KafkaProducer`实例,使用`send`方法向指定的topic发送消息,消息内容为json文件中的数据。最后,我们需要关闭连接。
接下来,我们可以编写消费者程序,示例代码如下:
```python
from kafka import KafkaConsumer
import json
# 创建KafkaConsumer实例
consumer = KafkaConsumer('test_topic', bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', enable_auto_commit=True)
# 消费消息
for message in consumer:
data = json.loads(message.value.decode('utf-8'))
print(data)
# 关闭连接
consumer.close()
```
我们首先创建了一个`KafkaConsumer`实例,指定了要消费的topic、bootstrap_servers、auto_offset_reset和enable_auto_commit等参数。然后,在一个循环中,我们使用`json`库将收到的消息解码为json格式,并输出到控制台上。最后,我们需要关闭连接。
注意:上述代码中`auto_offset_reset`参数设置为`earliest`,表示当消费者第一次启动时,从最早的offset开始消费消息。如果想从最新的offset开始消费消息,可以将其设置为`latest`。
阅读全文