java consumer
时间: 2023-07-03 10:31:36 浏览: 86
您好!您这里提到的 "java consumer" 是否指的是 Kafka 消费者(consumer)的 Java 实现呢?如果是的话,Kafka 提供了官方的 Java 客户端库,可以方便地使用 Java 编写 Kafka 消费者。以下是一个简单的 Kafka 消费者示例代码:
```java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyKafkaConsumer {
public static void main(String[] args) {
// 配置 Kafka 消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建 Kafka 消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("my-topic"));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
```
以上代码中,我们使用 KafkaConsumer 类创建了一个 Kafka 消费者实例,并订阅了名为 "my-topic" 的主题。在 while 循环中,我们使用 poll() 方法从 Kafka 集群中拉取消息,然后遍历 ConsumerRecords 对象,逐条消费消息。
希望这个简单的示例能够帮助您了解如何使用 Java 编写 Kafka 消费者。
阅读全文