使用java写出消费组和主题的消费情况
时间: 2024-02-16 11:03:11 浏览: 100
可以使用Kafka提供的Java API来获取消费组和主题的消费情况。以下是示例代码:
```java
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerStatus {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "test-group";
private static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 创建AdminClient对象
AdminClient adminClient = AdminClient.create(props);
// 获取消费组列表
ListConsumerGroupsResult cgResult = adminClient.listConsumerGroups();
for (ConsumerGroupListing cgListing : cgResult.all().get()) {
if (cgListing.groupId().equals(GROUP_ID)) {
// 获取消费组的消费情况
ConsumerGroupDescription cgDesc = adminClient.describeConsumerGroups(Arrays.asList(GROUP_ID)).all().get().get(GROUP_ID);
System.out.println("消费组:" + cgListing.groupId() + ",消费情况:" + cgDesc.state());
}
}
// 获取主题列表
ListTopicsResult topicResult = adminClient.listTopics();
TopicDescription topicDesc = null;
for (String topic : topicResult.names().get()) {
if (topic.equals(TOPIC_NAME)) {
// 获取指定主题的分区列表
topicDesc = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)).all().get().get(TOPIC_NAME);
System.out.println("主题:" + topic + ",分区数:" + topicDesc.partitions().size());
}
}
// 获取指定消费组在指定主题下的消费情况
ListConsumerGroupOffsetsResult offsetResult = adminClient.listConsumerGroupOffsets(GROUP_ID);
long totalLag = 0;
for (TopicPartition tp : offsetResult.partitionsToOffsetAndMetadata().keySet()) {
long offset = offsetResult.partitionsToOffsetAndMetadata().get(tp).offset();
long endOffset = adminClient.listOffsets(
Collections.singletonMap(tp, org.apache.kafka.clients.consumer.OffsetSpec.latest())).all().get().get(tp).offset();
long lag = endOffset - offset;
System.out.println("分区:" + tp + ",offset:" + offset + ",endOffset:" + endOffset + ",lag:" + lag);
totalLag += lag;
}
System.out.println("消费组:" + GROUP_ID + ",总延迟:" + totalLag);
}
}
```
上述代码中,`BOOTSTRAP_SERVERS`、`GROUP_ID`和`TOPIC_NAME`分别为Kafka集群地址、消费组和主题名称,可以根据实际需求进行修改。
执行上述代码后,会输出消费组的消费情况、主题的分区数以及指定消费组在指定主题下的消费情况。
阅读全文