kafka java动态获取topic并动态创建消费者
时间: 2024-01-29 18:04:33 浏览: 149
在Java中使用Kafka消费者,可以使用Consumer API。您可以使用Consumer API来动态获取topic并创建消费者。
以下是一个示例代码片段:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 获取topic列表
List<String> topics = Arrays.asList("topic1", "topic2");
// 动态添加订阅的topic
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
```
您可以通过将新的topic添加到topics列表中来动态获取topic。然后,调用subscribe()方法来订阅这些topic。
此示例还演示了如何使用Kafka消费者API消费消息。在while循环中,调用poll()方法以获取消息。然后,遍历ConsumerRecords对象以访问每条消息。
阅读全文