不要分区的offset要topic的offset
时间: 2024-02-20 10:57:47 浏览: 37
获取Kafka topic的offset可以使用KafkaAdminClient类中的方法。以下是Java代码示例:
```java
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTopicOffsetGetter {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Kafka admin client configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// Create Kafka admin client
AdminClient adminClient = AdminClient.create(props);
// Create topic partition
TopicPartition topicPartition = new TopicPartition("my-topic", 0);
// Create list offsets options
ListOffsetsOptions listOffsetsOptions = new ListOffsetsOptions(
ListOffsetsOptions.MAX_NUM_OFFSETS,
ListOffsetsOptions.BackendOverride.DISABLED,
ListOffsetsOptions.READ_UNCOMMITTED);
// Create offset spec
OffsetSpec offsetSpec = OffsetSpec.latest();
// List offsets for topic partition
Map<TopicPartition, ListOffsetsResultInfo> offsets = adminClient
.listOffsets(Collections.singletonMap(topicPartition, offsetSpec), listOffsetsOptions)
.all()
.get();
// Print offset for topic partition
ListOffsetsResultInfo info = offsets.get(topicPartition);
System.out.println("Topic: " + topicPartition.topic() +
", Partition: " + topicPartition.partition() +
", Offset: " + info.offset());
// Close admin client
adminClient.close();
}
}
```
在上面的代码中,我们创建了一个Kafka AdminClient实例,并使用listOffsets()方法获取指定topic的offset。我们使用ListOffsetsOptions类来指定选项,例如要获取最新的offset,以及是否要读取未提交的消息。最后,我们使用all()方法获取返回的结果,然后打印出offset的值。
阅读全文