flink设置从当前时间开始读kafka
时间: 2023-07-26 13:45:35 浏览: 94
Flink 可以使用 `FlinkKafkaConsumer` 中的 `setStartFromLatest()` 方法来从当前时间开始消费 Kafka 消息。
以下是示例代码:
```java
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-group");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
consumer.setStartFromLatest();
DataStream<String> stream = env.addSource(consumer)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
// 设置事件时间
return System.currentTimeMillis();
}
});
```
在上面的示例代码中,通过 `setStartFromLatest()` 方法设置从当前时间开始消费 Kafka 消息,并通过 `assignTimestampsAndWatermarks` 方法设置事件时间。
阅读全文