python kafka 代码示例
时间: 2023-10-05 15:14:03 浏览: 91
好的,以下是 Python 的 Kafka 代码示例:
```python
from kafka import KafkaProducer
from kafka import KafkaConsumer
# 生产者示例,用于向 Kafka 发送消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('test-topic', b'hello world')
# 消费者示例,用于从 Kafka 接收消息
consumer = KafkaConsumer('test-topic', bootstrap_servers='localhost:9092')
for msg in consumer:
print(msg)
```
注意,这里需要安装 kafka-python 库。并且需要替换 bootstrap_servers 参数为你实际的 Kafka 服务器地址。
相关问题
python kafka 代码示例
以下是一个简单的 Python Kafka 代码示例,用于向 Kafka 主题发送消息:
```python
from kafka import KafkaProducer
# 设置 Kafka 服务器地址和主题名称
bootstrap_servers = ['localhost:9092']
topicName = 'test_topic'
# 创建 Kafka 生产者
producer = KafkaProducer(bootstrap_servers = bootstrap_servers)
# 发送消息到 Kafka 主题
producer.send(topicName, b'Hello, World!')
# 关闭 Kafka 生产者连接
producer.close()
```
请注意,这只是一个简单的示例,您需要根据您的实际场景修改代码以满足您的需求。
python kafka json
在 Python 中使用 Kafka 发送和接收 JSON 格式的数据,可以使用 `json` 模块将 JSON 转换为字符串或字节流,并使用 Kafka 客户端库将其发送到 Kafka 集群。以下是一个简单的示例代码:
```python
from kafka import KafkaProducer, KafkaConsumer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
consumer = KafkaConsumer('my_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda x: json.loads(x.decode('utf-8')))
data = {'name': 'John', 'age': 30}
producer.send('my_topic', value=data)
for message in consumer:
print(message.value)
```
在这个示例中,我们首先创建了一个 `KafkaProducer` 实例,并将 `value_serializer` 参数设置为将 Python 对象转换为 JSON 字符串。然后,我们使用 `send()` 方法将 JSON 数据发送到 Kafka 主题。接下来,我们创建一个 `KafkaConsumer` 实例,并将 `value_deserializer` 参数设置为将 JSON 字符串转换为 Python 对象。最后,我们使用 `consumer` 读取 Kafka 主题中的 JSON 数据,并将其打印到控制台。
需要注意的是,当使用 JSON 格式发送和接收数据时,需要确保生产者和消费者都使用相同的 JSON 序列化和反序列化方式。
阅读全文