springboot适配kafka,并往kafka发送数据,并且保证消息零丢失,请给出maven依赖配置以及详细的java代码
时间: 2024-01-16 20:44:35 浏览: 140
spring boot的maven配置依赖详解
Maven依赖配置:
```
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.0</version>
</dependency>
```
Java代码:
```java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public KafkaProducerService() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 设置每个连接的最大未确认请求数
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
kafkaTemplate = new KafkaTemplate<>(producerFactory);
}
public void sendMessage(String topic, Object message) {
kafkaTemplate.send(topic, message);
}
}
```
使用示例:
```java
@Service
public class MyService {
private final KafkaProducerService kafkaProducerService;
public MyService(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
public void doSomething() {
MyMessage message = new MyMessage("hello, kafka!");
kafkaProducerService.sendMessage("my-topic", message);
}
}
```
阅读全文