Java kafka发送消息回调
时间: 2024-09-21 22:01:02 浏览: 49
在Java中,Apache Kafka 提供了一套完整的API和工具,用于生产者(Producer)将数据发布到主题(Topic),消费者(Consumer)订阅并消费这些消息。当你作为生产者发送消息时,Kafka通常不会直接提供消息发送后的回调函数。然而,你可以通过监听`org.apache.kafka.clients.consumer.ConsumerRecords`对象来间接获得消息处理的结果。
当生产者成功发送消息时,它会返回一个`Future<RecordMetadata>`,该对象表示消息已经被添加到了分区中。虽然这个future不代表消息已被所有消费者消费,但它可以作为一个异步操作的标识。如果你想得到消息确认或者处理错误,你需要自行设置一个回调机制:
```java
producer.send(record)
.thenAcceptAsync(metadata -> {
// 消息发送成功处理
System.out.println("Message sent to topic " + record.topic() + " partition " + metadata.partition());
}, executorService);
```
这里,`executorService`是一个线程池,用于异步处理回调。如果发送失败,未来可能会抛出异常,这时可以在catch块中捕获并处理错误。
对于消费者的回调,Kafka提供了一个`ConsumerRecordListener`接口,消费者注册这个接口的实例到特定的主题,每当有新的消息到达时,就会触发相应的事件处理器。
```java
MyEventListener listener = new MyEventListener();
consumer.assign(List.of(topic));
consumer.subscribe(Arrays.asList(topic), listener);
// 实现 ConsumerRecordListener
class MyEventListener implements ConsumerRecordListener<String, String> {
@Override
public void onPartitionsRevoked(String topic, Collection<int[]> partitions) {}
@Override
public void onPartitionsAssigned(Map<String, Collection<int[]>> assignments) {}
@Override
public void records_consumed(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
// 消息消费处理
processRecord(record);
}
}
private void processRecord(ConsumerRecord<String, String> record) {
//...
}
}
```
在这个例子中,`records_consumed`方法会在每个新记录到达时被调用。
阅读全文