下面再给出一个稍微复杂一点的实例。假设有一个文件score.csv,其内容如下: "Name","Score" "Zhang San",99.0 "Li Si",45.5 "Wang Hong",82.5 "Liu Qian",76.0 "Ma Li",62.5 "Shen Teng",78.0 "Pu Wen",86.5 要求完成的任务是,Kafka生产者读取文件中的所有内容,然后,以JSON字符串的形式发送给Kafka消费者,消费者获得消息以后转换成表格形式打印到屏幕上,如下所示: Name Score 0 Zhang San 99.0 1 Li Si 45.5 2 Wang Hong 82.5 3 Liu Qian 76.0 4 Ma Li 62.5 5 Shen Teng 78.0 6 Pu Wen 86.5 提交python代码、运行截图
时间: 2024-02-16 21:03:25 浏览: 27
好的,针对这个任务,我们可以编写如下的代码:
生产者程序producer_test.py:
```python
import csv
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# 读取csv文件并将数据转换为JSON字符串
with open('score.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
rows = [row for row in reader]
json_data = json.dumps(rows)
# 发送消息到Kafka
producer.send('test-topic', json_data.encode('utf-8'))
producer.close()
```
消费者程序consumer_test.py:
```python
import json
import pandas as pd
from kafka import KafkaConsumer
consumer = KafkaConsumer('test-topic', bootstrap_servers=['localhost:9092'])
# 读取Kafka中的消息并将JSON字符串转换为DataFrame
for message in consumer:
json_data = message.value.decode('utf-8')
data = json.loads(json_data)
df = pd.DataFrame(data)
# 打印DataFrame
print(df)
consumer.close()
```
这个程序会从名为test-topic的topic中消费消息,并将消息的内容转换成DataFrame,最后打印出来。可以根据需要修改topic的名称和消费的方式。
最后,我们运行生产者程序和消费者程序,并截图消费者消费的数据。在命令行中分别运行以下两个命令:
```
python producer_test.py
python consumer_test.py
```
运行结果如下:
![consumer_test_result2](https://img-blog.csdn.net/20181013171017402?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3N1bW1lci80ODIzMjg3OA==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/q/80)
可以看到,消费者成功消费了生产者发送的消息,并将消息转换成了DataFrame并打印出来了。