python kafka json
时间: 2023-06-29 19:06:36 浏览: 221
python消费入库脚本
在 Python 中使用 Kafka 发送和接收 JSON 格式的数据,可以使用 `json` 模块将 JSON 转换为字符串或字节流,并使用 Kafka 客户端库将其发送到 Kafka 集群。以下是一个简单的示例代码:
```python
from kafka import KafkaProducer, KafkaConsumer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
consumer = KafkaConsumer('my_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda x: json.loads(x.decode('utf-8')))
data = {'name': 'John', 'age': 30}
producer.send('my_topic', value=data)
for message in consumer:
print(message.value)
```
在这个示例中,我们首先创建了一个 `KafkaProducer` 实例,并将 `value_serializer` 参数设置为将 Python 对象转换为 JSON 字符串。然后,我们使用 `send()` 方法将 JSON 数据发送到 Kafka 主题。接下来,我们创建一个 `KafkaConsumer` 实例,并将 `value_deserializer` 参数设置为将 JSON 字符串转换为 Python 对象。最后,我们使用 `consumer` 读取 Kafka 主题中的 JSON 数据,并将其打印到控制台。
需要注意的是,当使用 JSON 格式发送和接收数据时,需要确保生产者和消费者都使用相同的 JSON 序列化和反序列化方式。
阅读全文