使用java程序消费昨天0点到昨天24点范围内的kafka topic数据
时间: 2024-05-12 18:17:18 浏览: 136
指定时间段消费Kafka工具
好的,我明白了。对于您的问题,您可以通过Java中的Kafka Consumer API来消费kafka topic数据。具体来说,您需要创建一个KafkaConsumer对象,并设置相关的配置,然后订阅您想要消费的topic,最后从consumer对象中接收消息并处理它们即可。
以下是一个Java程序示例,展示如何消费昨天0点到昨天24点范围内的kafka topic数据:
```java
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "your_topic_name";
private static final String BOOTSTRAP_SERVERS = "localhost:9092"; // replace with your kafka broker's address
public static void main(String[] args) {
// configure kafka consumer
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// calculate start and end offsets for yesterday's data
LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
Instant startInstant = yesterday.withHour(0).withMinute(0).withSecond(0).toInstant(ZoneOffset.UTC);
Instant endInstant = yesterday.withHour(23).withMinute(59).withSecond(59).toInstant(ZoneOffset.UTC);
long startOffset = consumer.offsetsForTimes(Collections.singletonMap(TOPIC_NAME, startInstant)).get(TOPIC_NAME).offset();
long endOffset = consumer.offsetsForTimes(Collections.singletonMap(TOPIC_NAME, endInstant)).get(TOPIC_NAME).offset();
// subscribe to the topic and seek to the start offset
consumer.subscribe(Collections.singleton(TOPIC_NAME));
consumer.seek(TOPIC_NAME, startOffset);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
// process the record
long currentOffset = record.offset();
if (currentOffset > endOffset) {
consumer.close();
}
});
}
}
}
```
需要注意的是,此示例代码尚未经过完整测试,可能存在错误或问题。建议您在实际生产环境中进行更全面的测试和部署前的严格代码审查。
阅读全文