kafka 消费代码
时间: 2024-05-28 08:07:31 浏览: 121
SpringBoot整合kafka,代码简洁,自动分配分区和指定分区消费(亲测可用)
5星 · 资源好评率100%
Kafka是一个分布式的消息队列,其最重要的功能之一就是实时流处理。Kafka消费者(Consumer)代码通常由以下几个步骤组成:
1. 创建Kafka Consumer实例:首先需要创建一个Kafka Consumer实例,用于从指定的Topic中读取消息。
```
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
```
2. 订阅Topic:使用consumer.subscribe()方法订阅一个或多个Topic。
```
consumer.subscribe(Collections.singletonList("my-topic"));
```
3. 消费消息:使用consumer.poll()方法来拉取消息,然后处理这些消息。在处理完一批消息后,需要手动提交offset以确保消息不会被重复消费。
```
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
```
4. 关闭Consumer:在退出应用程序前,需要调用consumer.close()方法来关闭Consumer实例。
```
consumer.close();
```
阅读全文