consumer.poll
时间: 2024-03-19 13:36:19 浏览: 23
根据提供的引用内容,我们可以了解到消费者是Kafka中的一个重要组件,而`consumer.poll()`是消费者API中的一个方法,用于从Kafka集群中拉取数据并返回给消费者。具体来说,`consumer.poll()`会从Kafka集群中获取一批消息,然后将这些消息返回给消费者进行处理。如果没有可用的消息,则`consumer.poll()`方法会阻塞等待,直到有新的消息可用为止。
下面是一个使用`consumer.poll()`方法的示例代码:
```python
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic', bootstrap_servers=['localhost:9092'])
while True:
messages = consumer.poll(timeout_ms=1000)
if not messages:
continue
for partition, message_list in messages.items():
for message in message_list:
print(f"Received message: {message.value.decode('utf-8')}")
```
上述代码中,我们首先创建了一个`KafkaConsumer`对象,并指定了要消费的主题和Kafka集群的地址。然后,我们使用一个无限循环来不断地调用`consumer.poll()`方法,从Kafka集群中获取消息并进行处理。如果没有可用的消息,则`consumer.poll()`方法会在指定的超时时间内阻塞等待,直到有新的消息可用为止。