借助Python编写生产者程序,读取教师机数据库school中student表的数据内容,将其转为JSON格式,发送给Kafka。再编写消费者程序,从Kafka中获取到相应JSON格式数据,打印出来。
时间: 2024-11-10 20:25:31 浏览: 28
使用python将mysql数据库的数据转换为json数据的方法
5星 · 资源好评率100%
在Python中,你可以使用pandas库读取SQL数据库中的数据,然后使用kafka-python库操作Kafka。以下是基本步骤:
首先,安装所需的库:
```bash
pip install pandas kafka-python psycopg2-binary
```
然后,编写生产者程序:
```python
import json
import psycopg2
from kafka import KafkaProducer
# 连接数据库
def connect_to_db():
conn = psycopg2.connect(
host="your_host",
database="school",
user="your_user",
password="your_password"
)
return conn.cursor()
# 读取数据并转换为JSON
def read_data_and_produce(conn):
cur.execute("SELECT * FROM student")
data = cur.fetchall()
producer = KafkaProducer(bootstrap_servers='localhost:9092') # 指定Kafka服务器地址
for row in data:
json_row = json.dumps(row)
producer.send('student_data', json_row.encode('utf-8')) # 发送到'student_data'主题
if __name__ == "__main__":
conn = connect_to_db()
read_data_and_produce(conn)
conn.close()
```
接下来,编写消费者程序:
```python
from kafka import KafkaConsumer
def consume_data():
consumer = KafkaConsumer('student_data',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest')
for message in consumer:
print(f"Received: {message.value.decode('utf-8')}")
if __name__ == "__main__":
consume_data()
```
上述代码中,`connect_to_db()` 函数用于连接到教师机数据库,`read_data_and_produce()` 通过查询student表获取数据并转换为JSON发送到Kafka,`consume_data()`函数则负责从Kafka消费数据并打印。
阅读全文