python kafka多线程消费
时间: 2023-08-26 16:05:12 浏览: 61
在Python中使用kafka-python库可以很容易地实现多线程消费kafka消息。以下是一个简单的示例代码:
```python
from kafka import KafkaConsumer
import threading
def consume():
consumer = KafkaConsumer('topic_name', bootstrap_servers=['localhost:9092'])
for message in consumer:
print("Thread ID: %s, Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" %
(threading.current_thread().ident, message.topic, message.partition, message.offset, message.key, message.value))
if __name__ == '__main__':
threads = []
for i in range(3):
t = threading.Thread(target=consume)
threads.append(t)
t.start()
for t in threads:
t.join()
```
在这个例子中,我们创建了三个消费者线程,每个线程都使用相同的KafkaConsumer实例来消费消息。在消费循环中,我们打印了每个消息的一些元数据(主题、分区、偏移量、键和值),以便我们可以看到消息被哪个线程处理。
请注意,KafkaConsumer是线程安全的,因此可以在多个线程中共享。但是,由于KafkaConsumer是一个长时间运行的对象,因此请确保正确关闭它以避免资源泄漏。