使用java 代码 输出显示 kafka 消费组下的所有主题
时间: 2024-02-03 10:14:11 浏览: 67
可以使用 Kafka Java 客户端库中的 AdminClient 类来获取消费组下的所有主题,以下是示例代码:
```java
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.Collection;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
public class KafkaConsumerGroupTopics {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String groupId = "my-consumer-group";
try (AdminClient client = AdminClient.create(props)) {
// 获取消费组信息
KafkaFuture<ConsumerGroupDescription> groupFuture = client.describeConsumerGroups(Set.of(groupId)).all().get();
ConsumerGroupDescription groupDescription = groupFuture.all().get();
// 获取消费组下的所有主题
Collection<TopicPartitionInfo> topicPartitionInfos = groupDescription.partitions().stream()
.map(topicPartition -> {
try {
return client.describeTopics(Set.of(topicPartition.topic())).values().get(topicPartition.topic()).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.flatMap(topicDescription -> topicDescription.partitions().stream())
.collect(Collectors.toList());
Set<String> topics = topicPartitionInfos.stream()
.map(TopicPartitionInfo::topic)
.collect(Collectors.toSet());
System.out.println("Consumer group " + groupId + " subscribed to topics: " + topics);
}
}
}
```
上述代码中,我们首先创建了一个 AdminClient 对象,并指定了 Kafka 集群的地址。然后,我们通过 `describeConsumerGroups` 方法获取消费组的描述信息,再通过 `describeTopics` 方法获取每个主题的详细信息。最后,我们根据主题信息获取消费组订阅的所有主题,并将它们输出到控制台。
你需要将 `bootstrap.servers` 的值替换为你的 Kafka 集群的地址,并将 `groupId` 替换为你要查询的消费组的 ID。
阅读全文