kafka 自动提交offset+deomo
时间: 2023-07-01 13:23:46 浏览: 217
好的,针对你的问题,我可以为您提供以下的解答和代码示例:
Kafka 自动提交 Offset 是指消费者在消费消息时,会自动将该消费者最近一次消费的消息偏移量(Offset)提交到 Kafka 中。这种方式可以减轻开发人员的负担,但也可能会导致数据重复消费或漏消费的问题。因此,自动提交 Offset 应该谨慎使用。
下面是一个使用 Kafka 自动提交 Offset 的示例代码:
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaAutoCommitOffsetDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 开启自动提交 Offset
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
```
在上面的代码中,我们通过设置 `ENABLE_AUTO_COMMIT_CONFIG` 属性为 `true` 来开启自动提交 Offset。这样,消费者在每次调用 `poll()` 方法获取消息时,都会把最近一次消费的消息偏移量提交到 Kafka 中。
需要注意的是,自动提交 Offset 的时间间隔可以通过 `auto.commit.interval.ms` 属性进行配置,默认值为 5000 毫秒。如果想要更加精细地控制 Offset 的提交,可以将 `ENABLE_AUTO_COMMIT_CONFIG` 属性设置为 `false`,然后在代码中手动调用 `commitSync()` 或 `commitAsync()` 方法提交 Offset。
希望以上的代码示例和解答能够对您有所帮助。
阅读全文