springboot项目如何引进kafka
时间: 2023-12-20 17:05:15 浏览: 128
在Spring Boot项目中引入Kafka,需要在pom.xml文件中添加Kafka的依赖。可以在Maven中央仓库中找到Kafka的依赖,也可以在Spring官网中找到Kafka的依赖。在pom.xml文件中添加以下依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
```
在application.yml文件中配置Kafka的相关信息,例如:
```yaml
spring:
kafka:
bootstrap-servers: 192.168.1.113:9048
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: MyGroup1
```
在代码中使用KafkaTemplate发送消息,例如:
```java
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("test")
public RestResult test() {
kafkaTemplate.send("TOPIC_NAME", 0, "key", "this is a message");
return RestResult.wrapSuccessResponse();
}
```
同时,可以使用@KafkaListener注解监听Kafka消息,例如:
```java
@KafkaListener(topics = "TOPIC_NAME", groupId = "MyGroup1", containerFactory = "kafkaListenerContainerFactory")
public void kafkaListener(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
for (ConsumerRecord item : records) {
System.out.printf("topic is %s, offset is %d,partition is %s, value is %s \n", item.topic(), item.offset(), item.partition(), item.value());
log.info("topic is : {}, offset is : {},partition is : {}, value is : {}",item.topic(), item.offset(), item.partition(), item.value());
}
ack.acknowledge();
}
```
阅读全文