消费者客户端开发
在了解了消费者与消费组之间的概念之后,我们就可以着⼿进⾏消费者客户端的开发了。在Kafka的历史中,消费者客户端同⽣产者客户端⼀样也经历了两个⼤版本:第⼀个是于Kafka开源之初使⽤
Scala语⾔编写的客户端,我们可以称之为旧消费者客户端(OldConsumer)或Scala消费者客户端;第⼆个是从Kafka0.9.x版本开始推出的使⽤Java编写的客户端,我们可以称之为新消费者客户端
(NewConsumer)或Java消费者客户端,它弥补了旧客户端中存在的诸多设计缺陷
本节主要介绍⽬前流⾏的新消费者(Java语⾔编写的)客户端,⽽旧消费者客户端已被淘汰,故不再做相应的介绍了。
⼀个正常的消费逻辑需要具备以下⼏个步骤:
1. 配置消费者客户端参数及创建相应的消费者实例。
2. 订阅主题。
3. 拉取消息并消费。
4. 提交消费位移。
5. 关闭消费者实例。
代码清单2-2中已经简单对消费者客户端的编码做了演⽰,本节对其稍做修改,如代码清单8-1所⽰。
//代码清单8-1 消费者客户端示例
public class KafkaConsumerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig(){
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic()
+ ", partition = "+ record.partition()
+ ", offset = " + record.offset());
System.out.println("key = " + record.key()
+ ", value = " + record.value());
//do something to process record.
}
}
} catch (Exception e) {
log.error("occur exception ", e);
} finally {
consumer.close();
}
}
}
相⽐于代码清单2-2⽽⾔,修改过后的代码多了⼀点东西,我们按照消费逻辑的各个步骤来做相应的分析。
必要的参数配置
在创建真正的消费者实例之前需要做相应的参数配置,⽐如上⼀节中的设置消费者所属的消费组的名称、连接地址等。参照代码清单8-1中的initConfig()⽅法,在Kafka消费者客户端KafkaConsumer中有
4个参数是必填的。
bootstrap.servers:该参数的释义和⽣产者客户端KafkaProducer中的相同,⽤来指定连接Kafka集群所需的broker地址清单,具体内容形式为host1:port1,host2:post,可以设置⼀个或多个地址,中间
⽤逗号隔开,此参数的默认值为“”。注意这⾥并⾮需要设置集群中全部的broker地址,消费者会从现有的配置中查找到全部的Kafka集群成员。这⾥设置两个以上的broker地址信息,当其中任意⼀
个宕机时,消费者仍然可以连接到Kafka集群上。
group.id:消费者⾪属的消费组的名称,默认值为“”。如果设置为空,则会报出异常:Exceptioninthread"main"org.apache.kafka.common.errors.InvalidGroupIdException:TheconfiguredgroupIdis
invalid。⼀般⽽⾔,这个参数需要设置成具有⼀定的业务意义的名称。
key.deserializer和value.deserializer:与⽣产者客户端KafkaProducer中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执⾏相应
的反序列化操作才能还原成原有的对象格式。这两个参数分别⽤来指定消息中key和value所需反序列化操作的反序列化器,这两个参数⽆默认值。注意这⾥必须填写反序列化器类的全限定名,⽐
如⽰例中的org.apache.kafka.common.serialization.StringDeserializer,单单指定StringDeserializer是错误的。有关更多的反序列化内容可以参考下⼀节。
注意到代码清单8-1中的initConfig()⽅法⾥还设置了⼀个参数client.id,这个参数⽤来设定KafkaConsumer对应的客户端id,默认值也为“”。如果客户端不设置,则KafkaConsumer会⾃动⽣成⼀个⾮空字
符串,内容形式如“consumer-1”、“consumer-2”,即字符串“consumer-”与数字的拼接。
KafkaConsumer中的参数众多,远⾮⽰例initConfig()⽅法中的那样只有5个,开发⼈员可以根据业务应⽤的实际需求来修改这些参数的默认值,以达到灵活调配的⽬的。⼀般情况下,普通开发⼈员⽆法全
部记住所有的参数名称,只能有个⼤致的印象,在实际使⽤过程中,诸如“key.deserializer”、“auto.offset.reset”之类的字符串经常由于⼈为因素⽽书写错误。为此,我们可以直接使⽤客户端中的
org.apache.kafka.clients.consumer.ConsumerConfig类来做⼀定程度上的预防,每个参数在ConsumerConfig类中都有对应的名称,就以代码清单8-1中的initConfig()⽅法为例,引⼊ConsumerConfig后的修改
结果如下:
public static Properties initConfig(){
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client.id.demo");
return props;
}
注意到上⾯的代码中key.deserializer和value.deserializer参数对应类的全限定名⽐较长,也⽐较容易写错,这⾥通过Java中的技巧来做进⼀步的改进,相关代码如下:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
如此代码就简洁了许多,同时也预防了⼈为出错的可能。在配置完参数之后,我们就可以使⽤它来创建⼀个消费者实例:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
本节介绍的KafkaConsumer配置相关的内容基本上和介绍KafkaProducer配置时的⼀样,除了配置对应的反序列化器,只多了⼀个必要的group.id参数。
订阅主题和分区
在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。⼀个消费者可以订阅⼀个或多个主题,代码清单8-1中我们使⽤subscribe()⽅法订阅了⼀个主题,对于这个⽅法⽽⾔,既可以以集合的形
式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe的⼏个重载⽅法如下:
public void subscribe(Collection<String> topics,
ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
对于消费者使⽤集合的⽅式(subscribe(Collection))来订阅主题⽽⾔,⽐较容易理解,订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题,那么消费者以最后⼀次的为准。
consumer.subscribe(Arrays.asList(topic1));
consumer.subscribe(Arrays.asList(topic2));
上⾯的⽰例中,最终消费者订阅的是topic2,⽽不是topic1,也不是topic1和topic2的并集。
如果消费者采⽤的是正则表达式的⽅式(subscribe(Pattern))订阅,在之后的过程中,如果有⼈又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的
消息。如果应⽤程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅⽅式就很有效。在Kafka和其他系统之间进⾏数据复制时,这种正则表达式的⽅式就显得很常见。正则表达式的⽅式订
阅的⽰例如下: