实验内容: 在本地创建一个data.txt文本文件,内容为个人信息(如图所示),借助Python执行如下操作: (1)编写生产者程序,将data.txt中数据发布到Kafka上(对应topic为个人姓名全拼)。 (2)编写消费者程序,读取Kafka中的数据,并输出对应数据。
时间: 2024-11-04 16:13:35 浏览: 50
实验内容是关于利用Python进行消息队列处理。首先,你需要准备好以下几个步骤:
1. **设置环境**:确保已安装`kafka-python`库,用于连接和操作Apache Kafka。你可以通过pip安装它:
```
pip install kafka-python
```
2. **创建Data.txt文件**:在本地创建一个名为`data.txt`的文本文件,内容应包含个人信息,例如:
```
Name,Age,Email
Alice,25,alice@example.com
Bob,30,bob@example.com
Carol,35,carol@example.com
```
3. **生产者程序(producer.py)**:这是一个Python脚本,用于从`data.txt`中读取数据并将其发布到指定主题(这里以全名作为主题):
```python
from kafka import KafkaProducer
import csv
producer = KafkaProducer(bootstrap_servers='localhost:9092') # 根据实际情况修改Kafka服务器地址
with open('data.txt', 'r') as f:
reader = csv.reader(f)
for row in reader:
topic_name = row[0].lower() # 将名字转为小写,作为主题名
message = '\n'.join(row) # 每条记录为单独的消息
producer.send(topic_name.encode(), key=topic_name.encode(), value=message.encode())
producer.close()
```
4. **消费者程序(consumer.py)**:消费Kafka中的数据并打印出来:
```python
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest', # 初始偏移量
enable_auto_commit=True,
group_id=None # 单个消费者的组ID可以设为None
)
consumer.subscribe(['alice', 'bob', 'carol']) # 调用订阅主题列表
for message in consumer:
topic = message.topic
data = json.loads(message.value.decode()) # 如果消息体是JSON格式,需转换
print(f"Topic: {topic}, Message: {data}")
consumer.close()
```
运行上述生产者和消费者程序,数据就会按照名字的主题分发并在控制台显示。
阅读全文