springboot集成kafka,并在生产者异步发送消息且配置ack,并设置回调函数
时间: 2024-05-07 07:20:47 浏览: 82
首先,需要在pom.xml文件中添加kafka的依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.10.RELEASE</version>
</dependency>
```
接着,在application.properties文件中配置kafka的相关信息:
```properties
#kafka配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
```
其中,`bootstrap-servers`表示kafka的地址和端口,`acks`表示消息确认模式,`retries`表示消息发送失败后的重试次数,`batch-size`表示每次批量发送消息的大小,`buffer-memory`表示消息缓存区的大小。
接下来,在配置类中创建KafkaTemplate和生产者回调函数:
```java
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
return props;
}
@Bean
public KafkaProducerListener<String, String> kafkaProducerListener() {
return new KafkaProducerListener<>();
}
}
class KafkaProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
System.out.println("消息发送成功:" + producerRecord.toString());
}
@Override
public void onError(ProducerRecord<K, V> producerRecord, Exception exception) {
System.out.println("消息发送失败:" + producerRecord.toString() + ", " + exception.getMessage());
}
}
```
在上面的代码中,我们创建了一个`KafkaTemplate`,并通过`DefaultKafkaProducerFactory`将其与`ProducerConfig`绑定。然后,我们实现了一个生产者回调函数`KafkaProducerListener`,其中`onSuccess`方法在消息发送成功后被调用,`onError`方法在消息发送失败后被调用。
最后,在消息发送时,我们可以通过`KafkaTemplate`的`send`方法异步发送消息,并传入回调函数:
```java
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("发送消息成功:" + result.toString());
}
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:" + ex.getMessage());
}
});
}
}
```
如上所示,我们通过`kafkaTemplate.send`方法异步发送消息,并在`ListenableFuture`的回调函数中处理发送结果。
阅读全文