1.3 拦截器
1.3.1 拦截器介绍
Producer 的拦截器(Interceptor)和 Consumer 的 Interceptor 主要⽤于实现 Client 端的定
制化控制逻辑。
对于 Producer⽽⾔,Interceptor 使得⽤户在消息发送前以及 Producer 回调逻辑前有机会对
消息做⼀些定制化需求,⽐如修改消息等。同时,Producer 允许⽤户指定多个 Interceptor 按
序作⽤于同⼀条消息从⽽形成⼀个拦截链(Interceptor Chain)。Intercetpor 的实现接⼝是
org.apache.kafka.clients.producer.ProducerInterceptor,其定义的⽅法包括:
onSend(ProducerRecord):该⽅法封装进 KafkaProducer.send⽅法中,即运⾏在⽤户主线程
中。Producer 确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做
任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响⽬标分区的计算。
onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送
失败时调⽤,并且通常都是在 Producer 回调逻辑触发之前。onAcknowledgement 运⾏在
Producer 的 IO 线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢 Producer 的消息
发送效率。
close:关闭 Interceptor,主要⽤于执⾏⼀些资源清理⼯作。
如前所述,Interceptor 可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线
程安全。另外倘若指定了多个 Interceptor,则 Producer 将按照指定顺序调⽤它们,并仅仅是
捕获每个 Interceptor 可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。这在使⽤过程中要
特别留意。
1.3.2 自定义拦截器
自定义拦截器步骤: