kafka 异步发送,回调里抛异常 主线程可以接收到吗
时间: 2024-09-20 15:08:16 浏览: 33
spring boot整合spring-kafka实现发送接收消息实例代码
在 Apache Kafka 中,异步发送消息通常意味着你不在发送请求的线程上等待响应。Kafka 生产者提供了异步 API(如 `send()` 或 `sendAsync()`),当你调用这些方法时,生产者会返回立即,而不是阻塞直到消息实际被投递到服务器。这个过程通常是这样的:
1. **发送请求**:你调用异步发送方法,传递消息及其配置信息。
2. **设置回调**:如果你提供了回调函数(如 CompletionHandler),生产者会在消息成功写入日志后调用这个函数,或者如果出现错误,则在回调中抛出异常。
3. **处理回调**:回调发生在生产者的线程上,不是你的主线程。这意味着如果在回调中抛出了异常,主线程并不会直接接收到这个异常,除非你主动捕获并传播。
为了确保主线程能够捕获到异步发送的异常,你需要在回调中适当地处理它。通常的做法是在回调中:
- 捕获异常
- 将异常包装成一个新的 `Future` 对象或者自定义的 Promise 对象
- 使用 Future 的 `completeExceptionally()` 或 Promise 的 `reject()` 方法将异常标记为未完成
- 主线程通过检查这个 Future 或 Promise 是否已完成(即有异常)来判断并处理
示例代码:
```java
Future<Void> future = producer.sendAsync(record, newCompletionHandler(...));
future.whenComplete((success, exception) -> {
if (exception != null) {
throw new RuntimeException("Failed to send message", exception);
}
});
```
或者,使用 Try-with-resources:
```java
try (AutoCloseable ignored = future) {
future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to send message", e.getCause());
}
```
阅读全文