基于 ConcurrentKafkaListenerContainerFactory 创建 KafkaListener
时间: 2024-01-12 09:05:27 浏览: 95
基于 ConcurrentKafkaListenerContainerFactory 创建 KafkaListener 的步骤如下:
1. 创建一个 KafkaListenerContainerFactory 实例,可以使用 ConcurrentKafkaListenerContainerFactory。
2. 设置 KafkaListenerContainerFactory 的属性,例如:bootstrap.servers、group.id、auto.offset.reset 等。
3. 创建一个 KafkaListener 方法,并使用 @KafkaListener 注解来指定监听的 topic。
4. 在方法中处理接收到的消息。
示例代码如下:
```java
@Configuration
@EnableKafka
public class KafkaListenerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@KafkaListener(topics = "test-topic")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received message: " + record.value());
}
}
```
在以上示例代码中,我们创建了一个 ConcurrentKafkaListenerContainerFactory 实例,并设置了其属性。
然后,我们创建了一个 KafkaListener 方法,使用 @KafkaListener 注解来指定监听的 topic,当接收到消息时,会执行该方法。
最后,在 KafkaListener 方法中,我们处理接收到的消息。
阅读全文