把 JSON 格式数据放入 Kafka 发送出去,然后,再从 Kafka 中获取到 JSON 格式数据,对其进行解析并写入到 MySQL 数据库,Topic 为 json_topic。 生产者程序 producer_json.py、消费者程序 consumer_json.py
时间: 2024-12-13 07:22:12 浏览: 9
在Python中,我们可以使用Kafka的`kafka-python`库和`json`模块以及MySQL的`pymysql`库来完成这个任务。以下是生产者(producer_json.py)和消费者的(consumer_json.py)基本步骤:
**生产者(producer_json.py):**
```python
import json
from kafka import KafkaProducer
import pymysql
# 创建Kafka Producer实例
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda x: json.dumps(x).encode('utf-8'))
# 数据准备示例
data = {'key': 'value', 'timestamp': int(time.time())}
# 将JSON数据发送到Kafka Topic
try:
topic_name = 'json_topic'
producer.send(topic_name, data)
producer.flush() # 确保消息已发送
except Exception as e:
print(f"Error sending to Kafka: {e}")
# 连接MySQL数据库
mysql_conn = None
try:
mysql_conn = pymysql.connect(host="localhost", user="your_user", password="your_password", db="your_db")
with mysql_conn.cursor() as cursor:
sql = "INSERT INTO your_table (json_column) VALUES (%s)"
cursor.execute(sql, (json.dumps(data),))
mysql_conn.commit()
except (pymysql.Error, ConnectionError) as e:
print(f"Error connecting to MySQL: {e}")
finally:
if mysql_conn:
mysql_conn.close()
```
**消费者(consumer_json.py):**
```python
from kafka import KafkaConsumer
import json
import pymysql
# 创建Kafka Consumer实例
consumer = KafkaConsumer('json_topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
# 消费消息并处理
for message in consumer:
data = message.value
try:
# 连接MySQL数据库
mysql_conn = pymysql.connect(host="localhost", user="your_user", password="your_password", db="your_db")
with mysql_conn.cursor() as cursor:
sql = "INSERT INTO your_table (json_column) VALUES (%s)"
cursor.execute(sql, (json.dumps(data),))
mysql_conn.commit()
except (pymysql.Error, ConnectionError) as e:
print(f"Error saving to MySQL: {e}")
finally:
consumer.close()
```
请注意,上述代码需要你根据实际情况替换服务器地址、数据库连接信息以及表名和列名。同时,在实际环境中,生产者和消费者可能需要进行异常处理和错误恢复,例如定期重新连接Kafka或MySQL。
阅读全文