flink-table-planner-blink
时间: 2023-08-09 22:02:01 浏览: 193
Flink Table Planner是Flink中用于处理Table API和SQL查询的模块,而Blink是Flink的新一代查询引擎。Flink Table Planner的目标是通过将Table API和SQL查询转换为执行计划,使查询能够在Flink上进行分布式执行。而Blink是在Flink Table Planner基础上进行改进和优化的版本。
相比于传统的Flink Table Planner,Blink在多个方面进行了改进。首先,Blink引入了自己的查询优化器,可以对查询进行更加高效的优化。其次,Blink提供了更为灵活和强大的查询特性,包括对复杂的窗口操作、模式识别和图查询的支持。此外,Blink还针对查询的延迟做了优化,提供了更短的查询响应时间。
在Flink Table Planner中,Blink还引入了基于内存的存储引擎,即内存表。内存表可以将数据存储在内存中,以加快查询的速度。此外,Blink还引入了Elasticsearch作为Flink的索引数据库,可以提供更快速的索引查询。
总而言之,Flink Table Planner和Blink是Flink中处理Table API和SQL查询的模块,其中Blink是对Flink Table Planner的改进和优化版本。Blink相比传统的Flink Table Planner具有更高效的查询优化、更灵活的查询特性和更短的查询响应时间,同时还引入了内存表和Elasticsearch作为索引数据库,以提供更快速的查询速度。
相关问题
flink-jdbc
### 使用 Apache Flink 与 JDBC 集成进行数据处理
#### 创建执行环境
为了在 Apache Flink 中使用 JDBC 连接器,首先需要创建一个 Flink 执行环境。这可以通过 `StreamExecutionEnvironment` 来完成,它是所有 Flink 应用程序的基础组件之一。
```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
```
#### 启用 Blink Planner
对于更复杂的 SQL 处理需求,建议启用 Blink Planner,它可以提供更好的性能和支持更多特性:
```java
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.useBlinkPlanner() // 启用 Blink Planner 支持更多 SQL 特性[^5]
.inStreamingMode()
.build());
```
#### 注册输入表和输出表
接下来定义要操作的数据源(即输入表),以及目标存储位置(即输出表)。这里展示的是从 Kafka 获取订单信息作为输入,并将结果保存至 MySQL 数据库的过程。
```sql
CREATE TABLE orders (
order_id BIGINT,
product STRING,
amount INT
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
CREATE TABLE result (
order_id BIGINT,
total_amount DOUBLE
) WITH (
'connector' = 'jdbc', -- 使用 JDBC connector
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'result_table', -- 输出到指定的数据库表格
'username' = 'root',
'password' = 'your_password_here'
);
```
以上配置说明了如何设置两个表结构及其对应的连接方式;其中一个是基于Kafka的消息队列系统中的主题来获取JSON格式化的消息流,另一个则是通过JDBC协议向MySQL服务器发送查询请求并更新特定的关系型数据库记录。
#### 实现逻辑转换过程
最后一步就是编写实际业务逻辑代码,比如统计每个产品的销售总额并将这些汇总后的数值存入之前提到的结果表里去。
```sql
INSERT INTO result
SELECT order_id, SUM(amount * price) AS total_amount
FROM orders JOIN products ON orders.product=products.name
GROUP BY order_id;
```
这段SQL语句实现了对来自Kafka的主题中每条记录按照产品名称关联上价格字段之后求得总金额的操作,并最终把计算得到的新纪录插入到了名为`result`的目标表内。
阅读全文