KafkaConsumer 怎么监听多个topic并且poll的时候只poll指定的topic
时间: 2024-03-19 10:46:14 浏览: 53
scala编写,以spark stream方式监听消费kafka的topic,同步写入elasticsearch
KafkaConsumer 可以监听多个 topic,可以通过在配置参数中设置 `ConsumerConfig.TOPIC_CONFIG` 参数来指定需要监听的 topic 列表。例如:
```java
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.TOPIC_CONFIG, "topic1,topic2,topic3"); // 监听多个 topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
```
在使用 `poll` 方法时,只需要调用一次,KafkaConsumer 就会从所有的 topic 中获取数据。如果你只想从指定的 topic 中获取数据,可以在调用 `poll` 方法时指定 `TopicPartition` 参数。例如:
```java
List<ConsumerRecord<String, String>> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if ("topic1".equals(record.topic())) { // 只处理指定 topic 的数据
// 处理数据
}
}
```
在处理数据时,可以通过 `record.topic()` 方法获取当前数据所属的 topic,然后判断是否是需要的 topic,从而实现只消费指定的 topic。
阅读全文