Java 获取Kafka的topic的offset
时间: 2024-02-20 19:57:44 浏览: 98
Kafka客户端开发实例java源码.zip
要获取Kafka的topic的offset,可以使用KafkaConsumer类中的方法。以下是Java代码示例:
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaOffsetGetter {
public static void main(String[] args) {
// Kafka consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Assign topic partitions
List<TopicPartition> partitions = new ArrayList<>();
TopicPartition partition0 = new TopicPartition("my-topic", 0);
TopicPartition partition1 = new TopicPartition("my-topic", 1);
partitions.add(partition0);
partitions.add(partition1);
consumer.assign(partitions);
// Seek to beginning of topic
consumer.seekToBeginning(partitions);
// Print offset for each partition
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Partition: " + record.partition() +
", Offset: " + record.offset() +
", Value: " + record.value());
}
}
}
}
```
在上面的代码中,我们创建了一个KafkaConsumer实例,并使用assign()方法指定要消费的topic和分区,然后使用seekToBeginning()方法将消费者定位到该topic的开头。接着,我们使用poll()方法从Kafka中获取一批记录,并遍历这些记录以获取每个分区的当前offset。
阅读全文