生成一个data.json文件,内容如下: [ {"name":"Tony","age":"21","hobbies" : ["basketball","tennis"]}, {"name":"Lisa","age":"20","hobbies" : ["sing","dance"]} ] 1. 根据上面给出的data.json文件,执行如下操作: (1)编写生产者程序,将json文件数据发送给Kafka; (2)编写消费者程序,读取Kafka的json文件数据,并手动提交偏移量。
时间: 2023-06-28 09:08:22 浏览: 328
以下是Python代码实现:
生产者程序:
```python
from kafka import KafkaProducer
import json
# 读取json文件
with open("data.json", "r") as f:
data = json.loads(f.read())
# 连接Kafka
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
# 发送数据到Kafka
for d in data:
producer.send('data', json.dumps(d).encode())
producer.flush()
# 关闭连接
producer.close()
```
消费者程序:
```python
from kafka import KafkaConsumer
import json
# 连接Kafka
consumer = KafkaConsumer('data', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=False)
# 读取数据并手动提交偏移量
for message in consumer:
data = json.loads(message.value.decode())
print(data)
consumer.commit()
```
需要注意的是,上述代码中使用了`json`模块来读取和序列化JSON数据。此外,消费者程序中的`enable_auto_commit=False`参数表示关闭自动提交偏移量,需要手动调用`commit()`方法来提交偏移量。
相关推荐
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![rar](https://img-home.csdnimg.cn/images/20210720083606.png)
![-](https://csdnimg.cn/download_wenku/file_type_column_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)