kafka消费者poll
时间: 2023-08-18 20:04:48 浏览: 161
Kafka消费者的`poll`方法是用来从Kafka集群中拉取消息的主要方法。当消费者调用`poll`时,它会向Kafka集群发送拉取请求,并等待一段时间以接收新的消息。
`poll`方法有一个可选的参数,用于指定等待时间,即在没有新消息可拉取时,消费者将等待的最长时间。如果没有指定等待时间或指定为0,`poll`方法将立即返回,不论是否有新消息可用。
当`poll`方法返回时,它将返回一个记录集合,即消费者从Kafka拉取到的消息。消费者可以遍历这个记录集合,逐条处理每条消息。
示例代码如下:
```java
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value());
}
}
```
在上述示例中,我们创建了一个Kafka消费者,并订阅了一个名为"my-topic"的主题。然后在一个无限循环中,我们不断调用`poll`方法以拉取新的消息,并对每条消息进行处理。
阅读全文