kafka消费当前数据
时间: 2023-07-03 07:04:43 浏览: 122
Flink消费Kafka数据1
要消费 Kafka 中的当前数据,你需要使用 Kafka Consumer API。首先,你需要创建一个 Kafka Consumer 实例并配置它所连接的 Kafka 集群的地址。然后,你需要订阅一个或多个 Kafka 主题,以便从这些主题中接收数据。最后,你需要在启动 Consumer 实例后循环调用 poll() 方法来获取数据记录。
以下是一个使用 Java Kafka Consumer API 消费当前数据的示例代码:
```java
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(record -> {
System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d\n",
record.key(), record.value(), record.partition(), record.offset());
});
}
}
}
```
在此示例中,我们创建了一个 Kafka Consumer 实例,并配置它所连接的 Kafka 集群的地址和消费者组 ID。然后,我们订阅了一个名为 "test-topic" 的 Kafka 主题。最后,我们在一个无限循环中调用 poll() 方法来获取 Kafka 中的当前数据,并将每个数据记录的关键字、值、分区和偏移量打印到控制台上。
阅读全文