Kafka常见问题整理常见问题整理
1、Kafka如何防止数据丢失
1)消费端弄丢数据)消费端弄丢数据
消费者在消费完消息之后需要执行消费位移的提交,该消费位移表示下一条需要拉取的消息的位置。Kafka默认位移提交方式
是自动提交,但它不是在你每消费一次数据之后就提交一次位移,而是每隔5秒将拉取到的每个分区中的最大的消费位移进行
提交。自动位移提交在正常情况下不会发生消息丢失或重复消费的现象,唯一可能的情况,你拉取到消息后,消费者那边刚好
进行了位移提交,Kafka那边以为你已经消费了这条消息,其实你刚开始准备对这条消息进行业务处理,但你还没处理完,然
后因为某些原因,自己挂掉了,当你服务恢复后再去消费,那就是消费下一条消息了,那么这条未处理的消息就相当于丢失
了。所以,很多时候并不是说拉取到消息就算消费完成,而是将消息写入数据库或缓存中,或者是更加复杂的业务处理,在这
些情况下,所有的业务处理完成才能认为消息被成功消费。Kafka也提供了对位移提交进行手动提交的方式,开启手动提交的
前提是消费者客户端参数enable.auto.commit配置为false,
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
消费者端手动提交方式提供了两种,commitSync()同步提交方式和commitAsync()异步提交方式。commitSync()同步提交方
式在调用时Consumer程序会处于阻塞状态,直到远端的broker返回提交结果,这个状态才会结束,这样会对消费者的性能有
一定的影响。commitAsync()异步提交方式在执行后会立刻返回,不会被阻塞,但是它也有相应的问题产生,如果异步提交失
败后,它虽然也有重试,但是重试提交的位移值可能早已经“过期”或者不是最新的值了,因此异步提交的重试其实没有意义。
这里我们可以把同步提交和异步提交相结合,以达到最理想的效果。
try {
while (true) {
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record : records) {
// 处理消息 record
}
consumer.commitAsync();
}
} catch (Exception e){
// 处理异常
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
2)Kafka端弄丢数据端弄丢数据
如下图,副本A为leader副本,副本B为follower副本,它们的HW和LEO都为4。
image
此时,A中写入一条消息,它的LEO更新为5,B从A中同步了这条数据,自己的LEO也更新为5
image
之后B再向A发起请求以拉取数据,该FetchRequest请求中带上了B中的LEO信息,A在收到该请求后根据B的LEO值更新了自
己的HW为5,A中虽然没有更多的消息,但还是在延时一段时间之后返回FetchRresponse,其中也包含了HW信息,最后B根
据返回的HW信息更新自己的HW为5。
image
可以看到整个过程中两者之间的HW同步有一个间隙,B在同步A中的消息之后需要再一轮的FetchRequest/FetchResponse才
能更新自身的HW为5。如果在更新HW之前,B宕机了,那么B在重启之后会根据之前HW位置进行日志截断,这样便会将4这
条消息截断,然后再向A发送请求拉取消息。此时若A再宕机,那么B就会被选举为新的leader。B恢复之后会成为follower,由
于follower副本的HW不能比leader副本的HW高,所以还会做一次日志截断,以此将HW调整为4。这样一来4这条数据就丢失
了(就算A不能恢复,这条数据也同样丢失了)。
image
评论10