kafaka 监听消息案例
时间: 2023-10-11 14:21:00 浏览: 72
Kafka 中监听消息的方式通常使用 Consumer API 进行消费。以下是一个使用 Kafka Consumer API 监听消息的案例:
```java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
String topicName = "test-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
```
上述代码首先创建了一个 KafkaConsumer 对象,并通过 `subscribe()` 函数订阅了一个主题。然后,使用一个无限循环来持续监听消息。在循环中,使用 `poll()` 函数获取消息记录,然后遍历消息记录并输出消息的 offset、key 和 value。
需要注意的是,代码中的 `bootstrap.servers` 配置需要指定 Kafka 集群中的至少一个 broker 地址,如果有多个 broker,则可以用逗号分隔指定多个地址。另外,`group.id` 配置用于指定消费者组的 ID,同一个消费者组中的多个消费者会共同消费一个主题的消息。
以上就是一个简单的 Kafka 监听消息的案例,可以根据实际业务需求进行定制。
阅读全文