kafka 消费者创建
时间: 2024-06-03 13:05:29 浏览: 90
Kafka 消费者创建需要以下几个步骤:
1. 引入 Kafka 相关依赖,例如 kafka-clients。
2. 创建 Kafka 消费者配置对象,设置相关属性,例如 bootstrap.servers、group.id、enable.auto.commit、auto.commit.interval.ms 等。
3. 创建 KafkaConsumer 对象,将配置对象传入构造函数。
4. 订阅想要消费的主题,可以使用 subscribe 方法订阅一个主题或多个主题,也可以使用 assign 方法手动指定分区消费。
5. 在循环中调用 poll 方法获取消息,处理完消息后调用 commitSync 方法手动提交偏移量。
以下是一个简单的 Kafka 消费者创建示例代码:
```
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
consumer.commitSync();
}
```
阅读全文