kafka java动态获取topic并动态创建消费者
时间: 2024-01-29 18:04:46 浏览: 66
可以使用 Kafka 的 AdminClient API 动态获取 topic 列表,并使用 KafkaConsumer API 动态创建消费者。
首先,使用 AdminClient API 获取所有的 topic 列表:
```
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(props);
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true); // 包括内部 topic
List<String> topicList = adminClient.listTopics(options).names().get();
```
然后,使用 KafkaConsumer API 动态创建消费者:
```
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 动态订阅 topic
for (String topic : topicList) {
consumer.subscribe(Collections.singletonList(topic));
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
```
注意,这里动态订阅的 topic,需要在 Kafka 集群中已经存在。如果要动态创建 topic,可以使用 AdminClient API 的 `createTopics()` 方法。例如:
```
NewTopic newTopic = new NewTopic("test-topic", 3, (short) 1);
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
result.all().get(); // 等待创建完成
```