kafka部署 springboot配置
时间: 2024-12-28 15:26:32 浏览: 12
### 配置Spring Boot与Kafka集成
在Spring Boot应用程序中集成了Apache Kafka消息系统,能够简化开发人员的工作流程并提高效率。为了实现这一目标,可以利用`spring-kafka`库来完成配置工作。
#### 添加依赖项
首先,在项目的构建文件(Maven或Gradle)中引入必要的依赖关系:
对于Maven pom.xml 文件:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
对于 Gradle build.gradle 文件:
```groovy
implementation 'org.springframework.kafka:spring-kafka'
```
#### 应用程序属性设置
接着通过application.properties 或 application.yml 来定义连接到Kafka集群所需的参数[^1]。
如果采用properties格式,则添加如下内容至 `src/main/resources/application.properties` 中:
```properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
```
如果是YAML格式的话则放置于 `src/main/resources/application.yml` 内部:
```yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
```
#### 创建消费者和生产者服务类
之后创建用于发送和接收消息的服务组件。这里给出简单的例子说明如何编写这些逻辑[^2]。
**ProducerService.java**
```java
@Service
public class ProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topicName, String message){
this.kafkaTemplate.send(topicName,message);
}
}
```
**ConsumerConfig.java 和 ConsumerListener.java**
配置监听器以及处理接收到的数据:
```java
@Configuration
@EnableKafka
public class ConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}
@Component
public class ConsumerListener {
@KafkaListener(topics="test-topic", groupId="my-group")
public void listen(String data) {
System.out.println("Received Message in group - " + data);
}
}
```
上述代码片段展示了基本的生产和消费过程中的关键部分。当然实际项目里可能还需要考虑更多细节比如错误重试机制、事务支持等功能特性[^3]。
阅读全文