spring boot 如何监听kafka消息代码
时间: 2024-01-08 22:03:54 浏览: 104
可以使用Spring Kafka提供的@KafkaListener注解来监听Kafka消息。以下是示例代码:
首先,需要在Spring Boot应用程序中添加以下依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.5</version>
</dependency>
```
接下来,在配置类中配置Kafka相关属性:
```java
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
```
在上面的代码中,我们使用@Value注解获取了Spring Boot应用程序中配置的Kafka服务器地址,并配置了Kafka消费者的相关属性。
接下来,我们可以创建一个Kafka消息监听器:
```java
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
```
在上面的代码中,我们使用@KafkaListener注解来指定要监听的Kafka主题,然后定义了一个receiveMessage方法来处理接收到的消息。
最后,我们需要在应用程序的启动类上添加@EnableKafka注解来启用Kafka监听器:
```java
@SpringBootApplication
@EnableKafka
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
```
这就是使用Spring Kafka来监听Kafka消息的示例代码。
阅读全文