java不订阅kafka topic获取topic偏移量
时间: 2023-10-08 13:06:32 浏览: 94
如果您想获取Kafka Topic的偏移量,而不想订阅它,可以使用Kafka的AdminClient API。以下是获取Kafka Topic偏移量的示例代码:
```java
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
public class KafkaTopicOffsetExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
private static final String TOPIC_NAME = "my-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建AdminClient实例
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
AdminClient adminClient = AdminClient.create(props);
// 获取Topic Description
Set<String> topicNames = Collections.singleton(TOPIC_NAME);
var topicDesc = adminClient.describeTopics(topicNames).all().get().get(TOPIC_NAME);
// 获取Topic Partitions
var partitions = topicDesc.partitions();
// 获取偏移量
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(Collections.singletonMap(new TopicPartition(TOPIC_NAME, 0), OffsetSpec.latest()));
long offset = listOffsetsResult.values().get(new TopicPartition(TOPIC_NAME, 0)).get().offset();
System.out.println("Offset: " + offset);
// 关闭AdminClient
adminClient.close();
}
}
```
在这个例子中,我们使用Kafka的AdminClient类获取Kafka Topic的偏移量。首先,我们创建AdminClient实例并获取Topic Description和Partitions。然后,我们使用listOffsets()方法获取Topic的最新偏移量。最后,我们打印偏移量并关闭AdminClient。注意,我们在这个例子中只获取了一个分区的偏移量,如果您想获取所有分区的偏移量,需要在listOffsets()方法中传递一个TopicPartition列表。
阅读全文