java设置kafka从beginning开始消费
时间: 2023-07-07 11:34:54 浏览: 113
在 Java 中,可以通过设置 KafkaConsumer 的 `auto.offset.reset` 参数来设置消费者从哪里开始消费。如果设置为 `earliest`,则消费者从最早的偏移量开始消费;如果设置为 `latest`,则消费者从最新的偏移量开始消费。
以下是一个示例代码,展示如何设置 KafkaConsumer 从最早的偏移量开始消费:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // 设置从最早的偏移量开始消费
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
```
在上面的代码中,我们创建了一个 KafkaConsumer,并设置了 `auto.offset.reset` 参数为 `earliest`,以便从最早的偏移量开始消费。然后我们订阅了一个名为 `test-topic` 的主题,并在一个无限循环中调用 `poll` 方法来消费消息。当有消息到达时,我们就打印出消息的偏移量、键和值。
阅读全文