给出一个flink sql读取kafka数据的demo
时间: 2023-11-19 12:04:23 浏览: 92
Flink实时读取Kafka数据批量聚合(定时按数量)写入Mysql.rar
5星 · 资源好评率100%
好的,以下是一个 Flink 1.14 SQL 读取 Kafka 数据的示例代码:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
public class FlinkSQLKafkaDemo {
public static void main(String[] args) throws Exception {
// 创建 Flink 流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建 Flink Table 环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Kafka 配置
String topic = "test_topic";
String bootstrapServers = "localhost:9092";
String groupId = "test_group_id";
// 定义 Kafka 数据源
Kafka kafka = new Kafka()
.version("universal")
.topic(topic)
.property("bootstrap.servers", bootstrapServers)
.property("group.id", groupId);
// 定义格式化器
Schema schema = new Schema()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT());
// 创建一个临时表
tableEnv.connect(kafka)
.withFormat(new Json().schema(schema))
.withSchema(new Schema()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT()))
.createTemporaryTable("test_table");
// 执行查询
TableResult result = tableEnv.executeSql("SELECT name, age FROM test_table WHERE age > 18");
// 输出结果
result.print();
}
}
```
这个示例代码演示了如何使用 Flink SQL 和 Kafka Connector 来读取 Kafka 数据,并在 SQL 查询中过滤数据。在示例中,我们首先定义了 Kafka 数据源和数据格式化器,然后使用 `createTemporaryTable` 方法创建了一个临时表,最后使用 `executeSql` 方法执行了一个 SQL 查询,并将结果打印出来。需要注意的是,示例中的 JSON 格式化器需要额外依赖 `flink-json` 库。
阅读全文