使用java程序消费指定时间范围内的kafka topic数据
时间: 2024-05-08 08:20:19 浏览: 130
您好,我明白您的要求。针对您的问题,您可以使用 Kafka Consumer API 和 Java 来消费指定时间范围内的 Kafka Topic 数据。您可以先通过指定时间范围设置 Kafka Consumer 的 offset,然后再读取 Topic 中的每个消息。以下是一个简单的 Java 代码示例:
```java
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class kafkaConsumer {
public static void main(String[] args) throws Exception {
Logger logger = LoggerFactory.getLogger(kafkaConsumer.class.getName());
String topicName = "your-topic-name";
String groupId = "your-group-id";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
// 获取指定时间范围内的 offset
long timestamp = System.currentTimeMillis() - 24 * 60 * 60 * 1000; // 24 hours ago
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
for (PartitionInfo partition : partitions) {
timestampsToSearch.put(new TopicPartition(topicName, partition.partition()), timestamp);
}
Map<TopicPartition, OffsetAndTimestamp> offsetList = consumer.offsetsForTimes(timestampsToSearch);
// 打印每个分区的 offset
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetList.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndTimestamp offsetAndTimestamp = entry.getValue();
if (offsetAndTimestamp != null) {
long offset = offsetAndTimestamp.offset();
consumer.seek(topicPartition, offset);
logger.info("Consumer is starting from offset {} for partition {}", offset, topicPartition);
}
}
// 读取消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + ", Value: " + record.value() + ", Partition: " + record.partition() + ", Offset: " + record.offset());
}
consumer.commitSync();
}
} catch(Exception e) {
logger.error("Error: " + e);
} finally {
consumer.close();
}
}
}
```
以上代码会订阅指定的 Topic,并获取指定时间范围内的 offset,然后从这些 offset 开始读取消息。您可以根据需要修改代码中的时间范围和其他配置参数。希望这个回答对您有帮助!
阅读全文
相关推荐
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![docx](https://img-home.csdnimg.cn/images/20241231044901.png)
![pdf](https://img-home.csdnimg.cn/images/20241231044930.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![-](https://img-home.csdnimg.cn/images/20241231045053.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![-](https://img-home.csdnimg.cn/images/20241226111658.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)