spring集成kafka消费者监听_SpringBoot开发案例之整合Kafka实现消息队列
时间: 2023-11-27 10:09:48 浏览: 61
spring boot整合spring-kafka实现发送接收消息实例代码
好的,我可以为您提供关于Spring集成Kafka消费者监听的解决方案。
首先,在pom.xml文件中添加以下依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
```
接下来,创建一个Kafka配置类,用于配置Kafka的相关属性,如下所示:
```java
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
在上面的代码中,我们配置了Kafka的bootstrap.servers、key.deserializer、value.deserializer、group.id等属性。同时,我们还定义了一个ConcurrentKafkaListenerContainerFactory,用于创建监听Kafka消息的容器。
最后,我们可以创建一个Kafka消息监听器,用于监听Kafka中的消息。示例代码如下:
```java
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "test-topic")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received message: " + record.value());
}
}
```
在上面的代码中,我们使用@KafkaListener注解指定要监听的Kafka主题。当有消息到达时,listen方法将会被调用。
以上就是Spring集成Kafka消费者监听的解决方案。希望能帮到您!
阅读全文