kafka java动态获取topic并动态创建消费者
时间: 2024-01-29 16:04:24 浏览: 257
SpringKafka 动态配置监听消费 Example Code
首先,你需要使用 Kafka 的 Java 客户端 API 来实现动态获取 topic 和创建消费者的功能。可以使用一个配置文件或者数据库来存储可用的 topic 名称列表,然后在程序启动时读取该列表。
接下来,使用 Kafka 的 Java 客户端 API 来创建一个消费者对象,并订阅你需要消费的 topic。可以使用 KafkaConsumer 类来创建消费者对象。
下面是一个简单的代码示例:
```java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.*;
public class DynamicConsumer {
public static void main(String[] args) throws Exception {
// 读取 topic 列表
List<String> topics = readTopicsFromConfig();
// 创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig());
// 动态订阅 topic
consumer.subscribe(topics);
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
// 从配置文件或数据库中读取 topic 列表
private static List<String> readTopicsFromConfig() {
List<String> topics = new ArrayList<>();
// TODO: 从配置文件或数据库中读取 topic 列表
return topics;
}
// 获取消费者配置
private static Properties getConsumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
```
在上面的代码中,`readTopicsFromConfig()` 方法可以从配置文件或数据库中读取可用的 topic 列表。`getConsumerConfig()` 方法返回一个包含消费者配置的 Properties 对象,可以根据实际情况进行修改。
最后,在 `consumer.subscribe(topics)` 中动态订阅可用的 topic,然后使用 `consumer.poll()` 方法来消费消息。
阅读全文