kafka getOffset
时间: 2023-07-07 17:44:14 浏览: 68
kafkaoffsetmonitor
Kafka 提供了 `Consumer` 接口的 `committed()` 方法来获取消费者组中某个消费者的消费偏移量。这个方法接收一个 `TopicPartition` 对象作为参数,返回该分区的消费偏移量。如果消费者尚未提交过偏移量,则返回 `null`。
示例代码如下:
```java
Consumer<String, String> consumer = new KafkaConsumer<>(props);
List<TopicPartition> partitions = Arrays.asList(new TopicPartition("test", 0));
consumer.assign(partitions);
// 获取消费者组中消费者的消费偏移量
for (TopicPartition partition : partitions) {
OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
if (offsetAndMetadata != null) {
System.out.println("Committed offset for partition " + partition.partition() + ": " + offsetAndMetadata.offset());
} else {
System.out.println("No committed offset for partition " + partition.partition());
}
}
```
上述代码中,我们首先通过 `assign()` 方法指定要消费的分区,然后通过 `committed()` 方法获取消费者组中消费者的消费偏移量。如果分区的偏移量已经提交,则打印偏移量;否则打印“未提交偏移量”的提示信息。
阅读全文