编写一个python实现的kafka模块供云查服务和爬虫模块数据通信、爬虫给识别模块数据通信,包含两个模块化函数producer和consumer。要求自测实现传递数据并发性良好
时间: 2024-03-02 07:49:43 浏览: 16
以下是一个简单的Python实现的Kafka模块供云查服务和爬虫模块数据通信、爬虫给识别模块数据通信,包含两个模块化函数producer和consumer。
```python
from kafka import KafkaProducer, KafkaConsumer
import json
def producer(data, topic):
"""
data: 要发送的数据,类型为字典
topic: 要发送到的kafka主题
"""
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send(topic, value=data)
producer.flush()
def consumer(topic):
"""
topic: 要消费的kafka主题
"""
consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
yield message.value
```
使用方法:
1. 在需要发送数据的地方,调用producer函数,传入要发送的数据和要发送到的主题,即可将数据发送到指定的主题。
```python
data = {'name': 'John', 'age': 30, 'city': 'New York'}
producer(data, 'test_topic')
```
2. 在需要接收数据的地方,调用consumer函数,传入要消费的主题,即可获取该主题下的所有数据。
```python
for message in consumer('test_topic'):
print(message)
```
注意:需要先安装kafka-python包,可以使用pip进行安装。同时,需要在本地启动Kafka服务,使用默认端口号9092。