flink1.11 SQL 使用Kafka Connector 时 如何获取kafka消息日志时间
时间: 2024-02-03 15:12:57 浏览: 185
在 Flink 1.11 中使用 Kafka Connector 时,可以通过设置 `timestamp.extractor` 参数来指定消息时间戳的提取方式。如果你想要获取 Kafka 消息的日志时间,可以使用 `LogAndSkipOnInvalidTimestamp` 提取方式,并将 `timestamp.extractor.watermark.delay-ms` 参数设置为 0。
具体来说,你需要在创建 Kafka 数据源时设置 `timestamp.extractor` 和 `timestamp.extractor.watermark.delay-ms` 参数,示例如下:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.DeserializationSchemaWrapper;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
Pattern.compile("test-topic.*"),
new LogAndSkipOnInvalidTimestamp<>(), // 设置 timestamp.extractor
properties);
consumer.setStartFromEarliest();
consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {
@Override
public long extractTimestamp(String element, long previousTimestamp) {
// 不需要实现,因为我们已经在 Kafka Consumer 中设置了 timestamp.extractor
return 0;
}
@Override
public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
// 不需要实现,因为我们已经在 Kafka Consumer 中设置了 timestamp.extractor.watermark.delay-ms
return null;
}
});
env
.addSource(consumer)
.print();
env.execute("Kafka Source Example");
}
public static class LogAndSkipOnInvalidTimestamp<T> extends DeserializationSchemaWrapper<T> {
public LogAndSkipOnInvalidTimestamp() {
super(new SimpleStringSchema());
}
@Override
public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws Exception {
try {
// 提取消息时间戳
Long timestamp = Long.valueOf(topic.split("-")[1]);
// 构造一个带时间戳的元组
return (T) Tuple2.of(new String(messageKey), new String(message), timestamp);
} catch (Exception e) {
// 如果提取时间戳失败,则打印一条日志并跳过该条消息
System.err.println("Skip invalid message: " + new String(message));
return null;
}
}
}
}
```
上述示例代码中,我们通过自定义 `LogAndSkipOnInvalidTimestamp` 类来实现了 `KafkaDeserializationSchema` 接口,并在其中提取了 Kafka 消息的日志时间戳。在 `deserialize` 方法中,我们将 Kafka 消息转换为一个带时间戳的元组,并在返回时进行了类型转换。
在 `main` 函数中,我们通过 `new LogAndSkipOnInvalidTimestamp<>()` 来设置了 `timestamp.extractor` 参数,并将 `timestamp.extractor.watermark.delay-ms` 参数设置为 0。这样就可以在 Flink SQL 中使用带时间戳的元组来进行数据处理了。
阅读全文