springboot kafka配置
时间: 2023-11-05 13:04:23 浏览: 95
在Spring Boot中使用Kafka需要引入以下依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
Kafka的配置可以在application.properties文件中进行配置,例如:
```properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
```
其中,`spring.kafka.bootstrap-servers`指定了Kafka的服务器地址和端口号,`spring.kafka.consumer.group-id`指定了消费者组的ID。
此外,还可以在Java代码中进行配置,例如:
```java
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
}
```
上述代码中,通过`@Value`注解注入了配置文件中的`spring.kafka.bootstrap-servers`和`spring.kafka.consumer.group-id`,并将其作为参数传入到`consumerConfigs`方法中,配置了Kafka的消费者属性。然后通过`DefaultKafkaConsumerFactory`创建了一个消费者工厂,最后通过`ConcurrentKafkaListenerContainerFactory`创建了一个Kafka监听器容器工厂,用于创建Kafka监听器容器。
阅读全文