Python 通过kazoo函数实现连接至kafka获取topic消费的信息 并且在不存在topic时抛出异常
时间: 2024-02-03 22:12:09 浏览: 58
python kafka客户端消费样例
可以使用 `kazoo` 库来连接至 `kafka` 并获取 `topic` 的消费信息。以下是示例代码:
```python
from kazoo.client import KazooClient
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
zk_host = 'localhost:2181'
kafka_host = 'localhost:9092'
topic = 'my-topic'
# Connect to ZooKeeper
zk = KazooClient(hosts=zk_host)
zk.start()
# Check if the topic exists
if not zk.exists('/brokers/topics/{}'.format(topic)):
raise Exception('Topic {} does not exist.'.format(topic))
# Connect to Kafka
try:
consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_host,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group'
)
except NoBrokersAvailable:
raise Exception('No brokers available at {}.'.format(kafka_host))
# Consume messages from Kafka
for message in consumer:
print(message.value)
```
在上面的代码中,我们首先连接到 `ZooKeeper`,然后检查是否存在指定的 `topic`。如果 `topic` 不存在,我们就抛出一个异常。接下来,我们尝试连接到 `Kafka`,如果连接失败,同样抛出一个异常。最后,我们使用 `KafkaConsumer` 对象消费消息。
请注意,以上代码仅供参考,实际实现可能因环境和需求而异。
阅读全文