spring boot kafka 根据 topic列表创建消费者
时间: 2023-08-08 22:02:27 浏览: 65
在Spring Boot中使用Kafka根据topic列表创建消费者可以通过以下步骤进行:
1. 在pom.xml文件中添加Kafka依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
2. 在application.properties文件中配置Kafka连接信息,包括Kafka服务器地址等:
```properties
spring.kafka.bootstrap-servers=<kafka服务器地址>
```
3. 创建一个消费者配置类,用于设置Kafka消费者的相关配置:
```java
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
4. 创建消费者类,使用@KafkaListener注解指定要订阅的topic列表,并在对应的方法中处理消费到的消息:
```java
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"topic1", "topic2", "topic3"})
public void consumeMessage(ConsumerRecord<String, String> record) {
// 处理接收到的消息
System.out.println("收到消息:" + record.value());
}
}
```
通过以上步骤,我们就可以在Spring Boot中根据topic列表创建消费者。可以根据实际需求修改配置和消费逻辑,来处理不同的消息。