kafka java动态获取topic并动态创建消费者
时间: 2024-01-30 20:02:11 浏览: 186
SpringKafka 动态配置监听消费 Example Code
可以使用 Kafka 的 Java API 来动态获取 topic 并动态创建消费者。以下是一个示例代码:
```java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class DynamicConsumer {
public static void main(String[] args) {
// Kafka broker 地址
String bootstrapServers = "localhost:9092";
// 消费者组 ID
String groupId = "test-group";
// 消费者配置
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Kafka 消费者
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
// 动态获取的 topic 名称
String topic = "test-topic";
// 创建消费者订阅的 topic
consumer.subscribe(Collections.singletonList(topic));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
```
在上面的代码中,我们首先创建一个 Kafka 消费者并设置必要的配置项。然后,我们动态获取要消费的 topic 名称,并将其订阅给消费者。最后,我们使用 `poll` 方法从 Kafka 中获取消息并进行消费。在上述示例中,我们使用了一个无限循环来持续消费消息。你可以根据自己的需求更改代码来增加其他功能,例如将消费者动态创建为多线程消费者等。
阅读全文