怎么获取kafka topic存储数据的元数据信息,给出对应的java实现代码
时间: 2024-02-21 07:01:18 浏览: 185
Kafka利用Java实现数据的生产和消费实例教程
可以使用 Kafka Java API 中的 AdminClient 类来获取 Kafka topic 存储数据的元数据信息。具体实现代码如下:
```java
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
public class KafkaTopicMetadata {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
String topicName = "test-topic";
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));
KafkaFuture<Map<String, TopicDescription>> kafkaFuture = describeTopicsResult.all();
Map<String, TopicDescription> topicDescriptionMap = kafkaFuture.get();
TopicDescription topicDescription = topicDescriptionMap.get(topicName);
List<TopicPartitionInfo> topicPartitionInfoList = topicDescription.partitions();
System.out.println("Topic: " + topicName);
System.out.println("Partitions: " + topicPartitionInfoList.size());
for (TopicPartitionInfo topicPartitionInfo : topicPartitionInfoList) {
Node leader = topicPartitionInfo.leader();
List<Node> replicas = topicPartitionInfo.replicas();
List<Node> isr = topicPartitionInfo.isr();
System.out.println("Partition: " + topicPartitionInfo.partition());
System.out.println("Leader: " + leader.host() + ":" + leader.port());
System.out.println("Replicas: " + replicas);
System.out.println("Isr: " + isr);
}
}
}
}
```
以上代码中,我们首先创建一个 AdminClient 对象,然后调用其 describeTopics 方法获取指定 topic 的元数据信息。接着,我们从返回的 Map 中取出对应的 TopicDescription 对象,进而获取其包含的 Partition 信息。最后,我们遍历 Partition 信息,获取各个 Partition 的 Leader、Replicas 和 Isr 等信息。
阅读全文