kafka poll 过时
时间: 2023-10-02 19:05:36 浏览: 77
kafka-3.5.1
从 Kafka 2.0 版本开始,使用 poll 方法消费 Kafka 消息已被标记为过时的 API。相反,建议使用 subscribe 和 assign 方法进行消费。subscribe 方法允许使用组管理(group management)来消费主题上的消息,而 assign 则允许直接分配分区(partitions)来消费消息。这些方法的使用方式与 poll 方法略有不同,需要先创建一个 KafkaConsumer 对象,然后调用相应的方法来订阅或分配分区。以下是一个使用 subscribe 方法进行消费的示例:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
```
这个示例中,我们首先创建了一个 KafkaConsumer 对象,并使用 subscribe 方法订阅了一个主题。然后在一个无限循环中,调用 poll 方法来获取消息记录,并对每条记录进行处理。需要注意的是,poll 方法的参数指定了等待时间,这里我们设置为 100 毫秒。如果 poll 方法在等待时间内没有获取到任何消息,它会返回一个空的 ConsumerRecords 对象。
阅读全文