Python 和 Node.js 通过消息队列Kafka进行数据交互的例子
时间: 2024-02-25 12:57:56 浏览: 139
kafka-python批量发送数据的实例
下面是一个使用 Kafka 作为消息队列,实现 Python 和 Node.js 之间数据交互的示例:
1. 安装依赖
在 Python 中使用 Kafka 需要安装 kafka-python 库:
```
pip install kafka-python
```
在 Node.js 中使用 Kafka 需要安装 kafka-node 库:
```
npm install kafka-node
```
2. 启动 Kafka
在本地启动一个 Kafka 集群,可以使用 Docker Compose 快速启动:
```yaml
version: '3'
services:
zookeeper:
image: zookeeper:3.4.14
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.13-2.8.0
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
```
3. 编写 Python 程序
```python
from kafka import KafkaProducer, KafkaConsumer
import time
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# 发送消息到 Kafka
def send_message():
while True:
now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
producer.send('test', now.encode('utf-8'))
time.sleep(1)
# 接收 Kafka 消息
def receive_message():
consumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092'])
for msg in consumer:
print(msg.value.decode('utf-8'))
if __name__ == '__main__':
send_message()
```
上面的程序中,使用 KafkaProducer 向名为 'test' 的主题发送消息,消息内容为当前时间字符串。同时使用 KafkaConsumer 从 'test' 主题接收消息,并将消息打印出来。
4. 编写 Node.js 程序
```javascript
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
// 发送消息到 Kafka
async function send_message() {
await producer.connect()
while (true) {
const now = new Date().toISOString()
await producer.send({
topic: 'test',
messages: [{ value: now }]
})
console.log(`Sent: ${now}`)
await new Promise(resolve => setTimeout(resolve, 1000))
}
}
// 接收 Kafka 消息
async function receive_message() {
await consumer.connect()
await consumer.subscribe({ topic: 'test', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(message.value.toString())
},
})
}
if (require.main === module) {
send_message()
}
```
上面的程序中,使用 KafkaJS 库创建 Kafka 客户端,并且使用 producer 向名为 'test' 的主题发送消息,消息内容为当前时间字符串。同时使用 consumer 从 'test' 主题接收消息,并将消息打印出来。
5. 运行程序
分别在 Python 和 Node.js 的项目目录下运行上述程序即可实现数据交互。需要注意的是,在运行程序之前,需要先启动 Kafka 集群。
阅读全文