题目要求为:把 JSON 格式数据放入 Kafka 发送出去,然后,再从 Kafka 中获取到 JSON 格式数据,对其进行解析并写入到 MySQL 数据库,Topic 为 json_topic。 生产者程序 producer_json.py、消费者程序
时间: 2024-09-28 20:06:17 浏览: 82
Flume 抽取MYSQL Oracle数据 JSON格式 推送Kafka
要完成这个任务,你需要两个部分的代码:生产者程序(producer_json.py)和消费者程序(consumer_json.py)。这里是一个简化的示例来说明如何使用Python的`pika`(用于Kafka)和`json`(处理JSON数据)以及`mysql-connector-python`(连接MySQL数据库)库。
**生产者程序 - producer_json.py**
```python
import pika
import json
# 连接到Kafka broker
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建一个topic
channel.exchange_declare(exchange='json_topic', exchange_type='topic')
# 准备要发送的JSON数据
data = {
"key": "value",
"nested_data": {"another_key": "another_value"}
}
# 将JSON编码为字符串并发送消息
message = json.dumps(data)
channel.basic_publish(exchange='json_topic', routing_key='json.key', body=message)
print(f"消息已发送: {message}")
# 关闭连接
connection.close()
```
**消费者程序 - consumer_json.py**
```python
import pika
import json
import mysql.connector
def callback(ch, method, properties, body):
try:
# 解析接收到的JSON数据
received_data = json.loads(body)
# 连接到MySQL数据库
cnx = mysql.connector.connect(user='your_username', password='your_password',
host='localhost',
database='your_database')
cursor = cnx.cursor()
# 插入数据到表中
sql_query = f"INSERT INTO your_table (column1, column2) VALUES (%s, %s)"
data_values = (received_data['key'], received_data['nested_data']['another_key'])
cursor.execute(sql_query, data_values)
# 提交事务并关闭连接
cnx.commit()
print("Data inserted successfully.")
cursor.close()
cnx.close()
except Exception as e:
print(f"Error: {e}")
ch.basic_reject(delivery_tag=method.delivery_tag)
if __name__ == "__main__":
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='json.key') # 消费指定主题下的消息
# 设置回调函数处理接收到的消息
channel.basic_consume(queue='json.key', on_message_callback=callback, auto_ack=True)
print("开始消费...")
channel.start_consuming()
```
注意:请替换上述代码中的数据库连接信息和SQL查询中的表名、字段名等,以及生产者的`routing_key`(根据实际需要调整)。
阅读全文