使用java代码查看消费组和主题的消费信息
时间: 2024-02-16 09:03:38 浏览: 24
可以使用 Kafka 的 Java 客户端来查看消费组和主题的消费信息。以下是一个简单的示例代码:
```java
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
public class KafkaConsumerGroupInfo {
public static void main(String[] args) throws Exception {
// Kafka 集群地址
String bootstrapServers = "localhost:9092";
// 消费组 ID
String groupId = "test-group";
// 创建 Kafka 管理客户端
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AdminClient adminClient = AdminClient.create(adminProps);
// 列出所有主题
ListTopicsResult topicsResult = adminClient.listTopics();
topicsResult.names().get().forEach(topicName -> {
try {
// 获取主题的描述信息
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topicName)).values().get(topicName).get();
System.out.println("Topic: " + topicName + ", Partitions: " + topicDescription.partitions().size());
// 获取消费组的消费信息
ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> offsets = offsetsResult.partitionsToOffsetAndMetadata().get();
offsets.forEach((topicPartition, offsetAndMetadata) -> {
if (topicPartition.topic().equals(topicName)) {
System.out.println("Partition: " + topicPartition.partition() + ", Offset: " + offsetAndMetadata.offset());
}
});
} catch (Exception e) {
e.printStackTrace();
}
});
// 列出所有消费组
ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();
groupsResult.all().get().forEach(consumerGroup -> {
try {
if (consumerGroup.groupId().equals(groupId)) {
// 获取消费组的描述信息
ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singleton(groupId)).all().get().get(groupId);
System.out.println("Group: " + groupId + ", State: " + groupDescription.state().toString());
}
} catch (Exception e) {
e.printStackTrace();
}
});
// 关闭 Kafka 管理客户端
adminClient.close();
}
}
```
这段代码会列出 Kafka 集群中所有主题的分区数量,并查看指定消费组的消费信息和状态。注意,需要在代码中修改正确的 Kafka 集群地址和消费组 ID。