在本地创建一个data.txt文本文件,内容为个人信息(如图所示),借助Python执行如下操作: (1)编写生产者程序,将data.txt中数据发布到Kafka上(对应topic为个人姓名全拼)。 (2)编写消费者程序,读取Kafka中的数据,并输出对应数据。
时间: 2024-11-03 15:14:21 浏览: 6
实验四:Python程序设计之文件data.txt数据文件,完整版,可以画出完整图形
首先,你需要安装`kafka-python`库,这是一个用于Apache Kafka的Python客户端。你可以使用pip进行安装:
```bash
pip install kafka-python
```
接下来,我会分别介绍如何编写生产者和消费者的Python代码:
### 生产者(Producer):
```python
from kafka import KafkaProducer
import json
# 创建一个Producer实例
producer = KafkaProducer(bootstrap_servers='localhost:9092') # 根据你的Kafka服务器配置替换
def write_to_file(name):
with open('data.txt', 'r') as f:
data = json.load(f) # 假设data.txt的内容是JSON格式
topic_name = name.lower().replace(" ", "") # 将姓名转换为小写并去除空格作为主题名
# 发布消息到Kafka
producer.send(topic=topic_name, value=data)
# 示例个人信息字典
personal_info = {
"name": "张三",
"age": 30,
"email": "zhangsan@example.com"
}
write_to_file(personal_info["name"]) # 使用实际姓名替换这个值
producer.flush() # 确保所有消息都被发送出去
```
### 消费者(Consumer):
```python
from kafka import KafkaConsumer
import json
def consume_from_kafka():
consumer = KafkaConsumer(
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_group' # 消费者组ID,可以自定义
)
consumer.subscribe(['zhansan']) # 替换为你存储数据的主题名
for message in consumer:
try:
data = json.loads(message.value)
print(f"Received from '{message.topic}': {data}")
except ValueError as e:
print(f"Error decoding message: {e}")
consume_from_kafka()
```
这两个程序假设`data.txt`是一个包含JSON格式个人信息的文本文件,同时Kafka服务器正在运行并且监听`localhost:9092`。
注意,生产者和消费者之间的通信需要相同的网络环境,而且这里使用的是基本的同步模式,生产者和消费者都阻塞直到完成操作。在实际应用中,可能会有更复杂的错误处理和性能优化需求。
阅读全文