kafka接收消息实例
时间: 2023-03-22 08:01:15 浏览: 68
当使用Kafka接收消息时,需要先创建一个Kafka消费者,然后将消费者订阅到指定的主题上。接着,Kafka消费者会自动从主题上拉取消息并将其存储在内存缓冲区中。随后,消费者可以通过轮询内存缓冲区来检索已接收的消息,并对其进行处理。在处理完消息后,消费者需要手动提交偏移量以确保不会重复消费已处理的消息。下面是一个Kafka接收消息的示例代码:
```
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key = %s, value = %s, partition = %d, offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
consumer.commitAsync();
}
```