kafka手动提交offset代码
时间: 2023-06-11 07:09:14 浏览: 124
以下是使用Java语言手动提交Kafka offset的示例代码:
```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交offset
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync(); // 手动提交offset
}
} finally {
consumer.close();
}
}
}
```
在上面的示例代码中,通过设置`enable.auto.commit`属性为`false`,关闭了Kafka Consumer的自动提交offset功能。在消费完一批消息后,调用`consumer.commitSync()`方法手动提交offset。这样可以更灵活地控制offset的提交,避免消息重复消费或漏消费的问题。
阅读全文