java 手动提交kafka
时间: 2023-07-08 17:54:45 浏览: 120
Java 手动提交 Kafka 消费者的方式有两种:同步提交和异步提交。
1. 同步提交:
```java
properties.put("enable.auto.commit", false); // 禁用自动提交 offset
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 消费消息
}
consumer.commitSync(); // 手动同步提交 offset
}
} finally {
consumer.close();
}
```
2. 异步提交:
```java
properties.put("enable.auto.commit", false); // 禁用自动提交 offset
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 消费消息
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
// 异常处理
}
}
}); // 手动异步提交 offset
}
} finally {
consumer.close();
}
```
需要注意的是,手动提交 offset 需要在正确处理完消息之后再提交,否则可能会导致消息重复消费或者丢失。同时,如果使用同步提交,在提交 offset 失败时会一直重试直到提交成功,而异步提交则不会重试,需要自行处理提交失败的情况。
阅读全文