Flink SQL专门用 OVER 聚合做了优化实现。所以只有在 Top N 的应用场景中,OVER 窗口 ORDER BY后才可以指定其它排序字段;而要想实现 Top N,就必须按照上面的格式进行定义,否则 Flink SQL 的优化器将无法正常解析。而且,目前 Table API 中并不支持 ROW_NUMBER()函数,所以也 只有 SQL 中这一种通用的 Top N 实现方式 public static void main(String[] args) { //执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); //执行SQL tableEnvironment.executeSql("CREATE TABLE t_goods (\n" + " gid STRING,\n" + " type INT,\n" + " price INT,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.gid.length'='10',\n" + " 'fields.type.min'='1',\n" + " 'fields.type.max'='1',\n" + " 'fields.price.min'='100',\n" + " 'fields.price.max'='999'\n" + ")"); // tableEnvironment.sqlQuery("select * from t_goods").execute().print(); //排序开窗函数--所有数据的排序 tableEnvironment.sqlQuery("select * from (" + " select *, ROW_NUMBER() OVER (" + " PARTITION BY type " + " ORDER BY price desc " + " ) AS rownum from t_goods" + ") WHERE rownum <= 3 ").execute().print(); }。怎么理解。。。
时间: 2023-07-08 07:05:49 浏览: 129
Flink SQL 中的 OVER 函数是用于聚合和排序的一种优化实现方式。对于 Top N 的场景,可以通过 OVER 函数实现。但是需要注意的是,只有在 ORDER BY 后才能指定其它排序字段,否则 Flink SQL 的优化器将无法正常解析。此外,Table API 中并不支持 ROW_NUMBER() 函数,所以只能在 SQL 中使用这种通用的 Top N 实现方式。以上代码中,首先创建了一个数据源表 t_goods,然后使用 SQL 查询语句实现 Top N 功能,其中 ROW_NUMBER() 函数用于计算每个 type 分组内按 price 排序后的行号,最后通过 WHERE rownum <= 3 来筛选出前三个数据。
相关问题
在 Flink SQL 中,是通过 OVER 聚合和一个条件筛选来实现 Top N 的。。。怎么理解。。
在 Flink SQL 中,可以使用 OVER 聚合和条件筛选来实现 Top N 的功能。
首先,OVER 聚合是指在一个窗口内对整个数据集进行聚合操作,而不是仅聚合每个分组。可以使用 OVER() 子句来指定 OVER 聚合,比如:
```
SELECT word, count(*) OVER() as cnt
FROM words
```
其中,`count(*) OVER()` 会对整个数据集进行计数。
然后,可以在 OVER 聚合的基础上使用条件筛选来获取 Top N 数据。比如,要获取前 10 个出现次数最多的单词,可以这样写:
```
SELECT word, cnt
FROM (
SELECT word, count(*) OVER() as cnt
FROM words
) WHERE row_number() OVER (ORDER BY cnt DESC) <= 10
```
其中,`row_number() OVER (ORDER BY cnt DESC)` 会根据 cnt 字段降序排序,然后为每一行分配一个序号,最后通过条件筛选保留前 10 行数据。
也可以使用其他聚合函数,比如 SUM()、AVG() 等等,来实现不同的 Top N 功能。
8. FlinkSQL 聚合查询 8.3. TopN 8.3.1. 普通TopN 8.3.2. 窗口Top N
在 FlinkSQL 中可以使用 TopN 实现对数据流中某个字段的排序并取出前 N 个数据。TopN 算子适用于一些需要对数据进行排序、筛选的场景,比如热门商品排行、用户行为分析等。
普通 TopN 是指对整个数据流进行排序,取出前 N 条数据。在 FlinkSQL 中,可以使用如下语句实现普通 TopN:
```
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (ORDER BY column DESC) as rownum
FROM table
) t
WHERE rownum <= N
```
其中,ROW_NUMBER() OVER (ORDER BY column DESC) as rownum 表示对表中的某个字段进行降序排序,并为每一行分配一个 rownum,表示该行在排序后的位置。然后在外层 SELECT 语句中筛选出 rownum 小于等于 N 的数据即可。
窗口 TopN 是指对某个时间窗口内的数据进行排序,取出前 N 条数据。在 FlinkSQL 中,可以使用如下语句实现窗口 TopN:
```
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY window ORDER BY column DESC) as rownum
FROM table
GROUP BY window, other_column
) t
WHERE rownum <= N
```
其中,ROW_NUMBER() OVER (PARTITION BY window ORDER BY column DESC) as rownum 表示对每个时间窗口内的数据进行排序,并为每一行分配一个 rownum,表示该行在排序后的位置。在外层 SELECT 语句中筛选出 rownum 小于等于 N 的数据即可。需要注意的是,在窗口 TopN 中,需要使用 GROUP BY 将数据按照时间窗口和其他字段进行分组,否则会出现重复数据。
阅读全文