请使用confluent_kafka,生产者和消费者的python3代码,生产者可指定输入字符串,消费者可指定输出文件
时间: 2024-03-05 18:49:54 浏览: 56
生产者消费者的python3实现
好的,以下是使用`confluent_kafka`实现生产者和消费者的Python3代码,生产者可以指定输入字符串,消费者可以指定输出文件:
```python
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
def kafka_producer(topic, input_str):
# Kafka producer configuration
conf = {'bootstrap.servers': 'localhost:9092'}
# Create Kafka producer instance
producer = Producer(conf)
try:
# Produce message to Kafka topic
producer.produce(topic, key='0', value=input_str)
producer.flush()
print("Produced message: {}".format(input_str))
except KafkaException as e:
print("Exception occurred: {}".format(e))
finally:
# Close Kafka producer instance
producer.close()
def kafka_consumer(topic, output_file):
# Kafka consumer configuration
conf = {'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'}
# Create Kafka consumer instance
consumer = Consumer(conf)
# Subscribe to Kafka topic
consumer.subscribe([topic])
try:
while True:
# Poll for Kafka messages
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached: {}'.format(msg))
else:
print('Error occurred: {}'.format(msg.error()))
else:
# Write Kafka message to output file
with open(output_file, 'a') as f:
f.write("{}: {}\n".format(msg.key(), msg.value()))
print("Consumed message: {}".format(msg.value()))
except KafkaException as e:
print("Exception occurred: {}".format(e))
finally:
# Close Kafka consumer instance
consumer.close()
# Main function
if __name__ == '__main__':
# Test producer and consumer functions
topic = 'test_topic'
input_str = 'Hello world!'
output_file = 'output.txt'
kafka_producer(topic, input_str)
kafka_consumer(topic, output_file)
```
在此示例中,我们定义了两个函数`kafka_producer`和`kafka_consumer`,分别用于生产者和消费者。生产者函数将指定的输入字符串发送到Kafka主题,消费者函数从主题中消费消息,并将其写入指定的输出文件中。
为了测试代码,请确保您已经在本地安装了Kafka,并创建了一个名为`test_topic`的主题。然后,在终端中运行上述代码,您应该能够看到生产者函数将消息发送到主题,消费者函数从主题中读取消息并将其写入`output.txt`文件中。
阅读全文