flink sql 读取kafka demo 详细
时间: 2023-08-09 22:05:12 浏览: 179
flink读取kafka数据.zip
以下是使用 Flink SQL 读取 Kafka 数据源的一个简单示例:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.descriptors.StreamTableDescriptor;
public class KafkaDemo {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 定义 Kafka 数据源
StreamTableDescriptor tableDescriptor = tEnv.connect(
new Kafka()
.version("universal")
.topic("input-topic")
.startFromLatest()
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "flink-group")
).withFormat(new Json().failOnMissingField(false).deriveSchema()).withSchema(
new Schema()
.field("id", DataTypes.STRING())
.field("name", DataTypes.STRING())
);
// 创建表
tEnv.createTable("input_table", tableDescriptor);
// 执行查询
Table result = tEnv.sqlQuery("SELECT * FROM input_table WHERE id = '1'");
result.printSchema();
// 执行任务
env.execute("KafkaDemo");
}
}
```
在上述代码中,我们首先创建了一个 `StreamExecutionEnvironment` 实例和一个 `StreamTableEnvironment` 实例,分别用于定义 Flink 执行环境和 Flink SQL 执行环境。
然后,我们定义了一个 Kafka 数据源,其中包括 Kafka 的连接信息、topic 名称和消费者组 ID。我们还指定了数据源的格式为 JSON,并定义了数据源中包含的字段和字段类型。
接下来,我们使用 `createTable()` 方法创建了一个名为 `input_table` 的表,并将 Kafka 数据源和数据格式和字段信息传递给 `withFormat()` 和 `withSchema()` 方法。
最后,我们使用 Flink SQL 查询语句 `SELECT * FROM input_table WHERE id = '1'` 查询 Kafka 中 id 为 1 的数据,并将查询结果打印出来。
需要注意的是,上述示例中使用的是 Flink 1.13 版本的新 API,如果使用的是旧版本的 Flink,可能需要使用不同的 API 进行操作。
阅读全文