请使用confluent_kafka,给我一份多线程生产者和消费者的python3代码,生产者可指定输入字符串,消费者可指定输出文件
时间: 2024-03-05 07:49:50 浏览: 149
好的,以下是使用`confluent_kafka`实现多线程生产者和消费者的Python3代码,生产者可以指定输入字符串,消费者可以指定输出文件:
```python
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import threading
class ProducerThread(threading.Thread):
def __init__(self, topic, input_str):
threading.Thread.__init__(self)
self.topic = topic
self.input_str = input_str
# Kafka producer configuration
self.conf = {'bootstrap.servers': 'localhost:9092'}
def run(self):
# Create Kafka producer instance
producer = Producer(self.conf)
try:
# Produce message to Kafka topic
producer.produce(self.topic, key=str(threading.get_ident()), value=self.input_str)
producer.flush()
print("Produced message: {}".format(self.input_str))
except KafkaException as e:
print("Exception occurred: {}".format(e))
class ConsumerThread(threading.Thread):
def __init__(self, topic, output_file):
threading.Thread.__init__(self)
self.topic = topic
self.output_file = output_file
# Kafka consumer configuration
self.conf = {'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'}
def run(self):
# Create Kafka consumer instance
consumer = Consumer(self.conf)
# Subscribe to Kafka topic
consumer.subscribe([self.topic])
try:
while True:
# Poll for Kafka messages
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached: {}'.format(msg))
else:
print('Error occurred: {}'.format(msg.error()))
else:
# Write Kafka message to output file
with open(self.output_file, 'a') as f:
f.write("{}: {}\n".format(msg.key(), msg.value()))
print("Consumed message: {}".format(msg.value()))
except KafkaException as e:
print("Exception occurred: {}".format(e))
# Main function
if __name__ == '__main__':
# Create producer and consumer threads
producer_thread_1 = ProducerThread('test_topic', 'Hello world 1!')
producer_thread_2 = ProducerThread('test_topic', 'Hello world 2!')
consumer_thread = ConsumerThread('test_topic', 'output.txt')
# Start threads
producer_thread_1.start()
producer_thread_2.start()
consumer_thread.start()
# Wait for threads to complete
producer_thread_1.join()
producer_thread_2.join()
consumer_thread.join()
```
在此示例中,我们创建了一个名为`ProducerThread`的生产者线程和一个名为`ConsumerThread`的消费者线程。生产者线程将指定的输入字符串发送到Kafka主题,消费者线程从主题中消费消息,并将其写入指定的输出文件中。
为了测试代码,请确保您已经在本地安装了Kafka,并创建了一个名为`test_topic`的主题。然后,在终端中运行上述代码,您应该能够看到生产者线程将两个消息发送到主题,消费者线程从主题中读取消息并将其写入`output.txt`文件中。
阅读全文