java获取kafka中每个topic里包含多少条数据
时间: 2024-05-02 19:21:03 浏览: 133
可以使用Kafka的Java API来获取每个topic里包含的消息数量。以下是一个简单的示例代码:
```java
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
public class KafkaTopicCount {
public static void main(String[] args) {
// 设置Kafka的配置属性
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 创建AdminClient实例
AdminClient adminClient = AdminClient.create(props);
// 获取所有topic的列表
ListTopicsResult topics = adminClient.listTopics();
// 遍历每个topic,并获取其分区的数量和每个分区里的消息数量
topics.names().get().forEach(topic -> {
try {
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topic)).all().get().get(topic);
int partitionCount = topicDescription.partitions().size();
long totalCount = topicDescription.partitions().stream()
.mapToLong(TopicPartitionInfo::replicas)
.sum();
System.out.println("Topic: " + topic + ", Partition Count: " + partitionCount + ", Total Message Count: " + totalCount);
} catch (Exception e) {
e.printStackTrace();
}
});
// 关闭AdminClient实例
adminClient.close();
}
}
```
以上代码首先创建了一个AdminClient实例,然后通过`listTopics()`方法获取所有topic的列表。接下来,遍历每个topic,并通过`describeTopics()`方法获取其分区的数量和每个分区里的消息数量。最后,输出每个topic的名称、分区数量和消息数量。
阅读全文