待回调函数的亦不发送回调函数在producer收到ack时调用为
时间: 2023-10-26 14:25:27 浏览: 38
这个问题涉及到 Kafka 中生产者发送消息时的确认机制。在 Kafka 中,生产者发送消息时需要等待消息被服务器确认后才能确定消息发送成功。确认机制可以通过设置生产者参数 acks 来控制,acks 参数有三种取值:
- acks=0,生产者不等待服务器的任何确认,直接发送下一条消息。
- acks=1,生产者等待服务器的确认,只要 Leader 副本确认接收到消息,就认为消息发送成功。
- acks=all,生产者等待所有副本的确认,只有当所有副本都确认接收到消息,才认为消息发送成功。
当生产者发送消息后等待服务器确认时,可以通过设置回调函数来处理确认结果。如果设置了回调函数,在生产者接收到消息发送成功的确认后,会自动调用回调函数进行处理。如果没有设置回调函数,在生产者接收到消息发送成功的确认后,也会进行默认的处理,例如打印日志等。
需要注意的是,如果生产者设置的 acks 参数为 0,不等待服务器的任何确认,那么也不会触发回调函数,生产者将直接发送下一条消息。
相关问题
kafka 设置了ack应答后,回调函数的意义
当Kafka Producer 发送消息时,可以设置 acks 参数来控制确认消息是否已经成功写入了Broker。当 acks 参数设置为 all 或 -1 时,表示 Producer 等待所有备份都完成后才会收到一个确认。在这种情况下,回调函数的意义是当所有备份都已成功写入 Broker 后,Kafka Producer 会调用回调函数来通知 Producer 发送消息成功。回调函数中可以对消息发送的成功与否进行处理,例如打印日志或进行下一步操作等。如果 acks 参数设置为 0 或 1,则不需要等待所有备份完成,因此不需要使用回调函数。
springboot集成kafka,并在生产者异步发送消息且配置ack,并设置回调函数
首先,需要在pom.xml文件中添加kafka的依赖:
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.10.RELEASE</version>
</dependency>
```
接着,在application.properties文件中配置kafka的相关信息:
```properties
#kafka配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
```
其中,`bootstrap-servers`表示kafka的地址和端口,`acks`表示消息确认模式,`retries`表示消息发送失败后的重试次数,`batch-size`表示每次批量发送消息的大小,`buffer-memory`表示消息缓存区的大小。
接下来,在配置类中创建KafkaTemplate和生产者回调函数:
```java
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
return props;
}
@Bean
public KafkaProducerListener<String, String> kafkaProducerListener() {
return new KafkaProducerListener<>();
}
}
class KafkaProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
System.out.println("消息发送成功:" + producerRecord.toString());
}
@Override
public void onError(ProducerRecord<K, V> producerRecord, Exception exception) {
System.out.println("消息发送失败:" + producerRecord.toString() + ", " + exception.getMessage());
}
}
```
在上面的代码中,我们创建了一个`KafkaTemplate`,并通过`DefaultKafkaProducerFactory`将其与`ProducerConfig`绑定。然后,我们实现了一个生产者回调函数`KafkaProducerListener`,其中`onSuccess`方法在消息发送成功后被调用,`onError`方法在消息发送失败后被调用。
最后,在消息发送时,我们可以通过`KafkaTemplate`的`send`方法异步发送消息,并传入回调函数:
```java
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("发送消息成功:" + result.toString());
}
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:" + ex.getMessage());
}
});
}
}
```
如上所示,我们通过`kafkaTemplate.send`方法异步发送消息,并在`ListenableFuture`的回调函数中处理发送结果。