spring集成kafka消费者监听_SpringBoot开发案例之整合Kafka实现消息队列
时间: 2023-11-27 13:07:47 浏览: 34
好的,下面是一个简单的示例,演示如何使用Spring Boot集成Kafka消费者监听。
首先,在 `pom.xml` 文件中添加以下依赖项:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
```
接下来,创建一个Kafka消费者配置类,实现 `ConsumerConfig` 接口:
```java
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
在上面的代码中,我们定义了一些Kafka消费者的配置,例如 `bootstrap.servers` 和 `group.id` 等。然后我们创建了一个 `ConsumerFactory` 和 `ConcurrentKafkaListenerContainerFactory` 的bean,用于创建Kafka消费者和监听器。
接下来,我们创建一个Kafka消费者监听器,用于接收和处理消息:
```java
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "my-topic")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
```
在上面的代码中,我们使用 `@KafkaListener` 注解来标记这是一个Kafka消费者监听器,`topics` 参数指定要监听的Kafka主题名称。然后我们定义了一个 `listen` 方法来处理接收到的消息。
最后,我们需要在 `application.properties` 文件中配置Kafka的连接信息:
```properties
spring.kafka.bootstrap-servers=localhost:9092
```
现在,我们已经完成了Spring Boot集成Kafka消费者监听的配置和实现。当Kafka主题中有新的消息时,Kafka消费者监听器会自动接收并处理消息。