Flink SQL专门用 OVER 聚合做了优化实现。所以只有在 Top N 的应用场景中,OVER 窗口 ORDER BY后才可以指定其它排序字段;而要想实现 Top N,就必须按照上面的格式进行定义,否则 Flink SQL 的优化器将无法正常解析。而且,目前 Table API 中并不支持 ROW_NUMBER()函数,所以也 只有 SQL 中这一种通用的 Top N 实现方式 public static void main(String[] args) { //执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); //执行SQL tableEnvironment.executeSql("CREATE TABLE t_goods (\n" + " gid STRING,\n" + " type INT,\n" + " price INT,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.gid.length'='10',\n" + " 'fields.type.min'='1',\n" + " 'fields.type.max'='1',\n" + " 'fields.price.min'='100',\n" + " 'fields.price.max'='999'\n" + ")"); // tableEnvironment.sqlQuery("select * from t_goods").execute().print(); //排序开窗函数--所有数据的排序 tableEnvironment.sqlQuery("select * from (" + " select *, ROW_NUMBER() OVER (" + " PARTITION BY type " + " ORDER BY price desc " + " ) AS rownum from t_goods" + ") WHERE rownum <= 3 ").execute().print(); }。怎么跟别人解释。。。
时间: 2023-07-08 16:05:49 浏览: 69
高性能FlinkSQL优化技巧
Flink SQL专门对OVER聚合进行了优化实现,因此只有在Top N的应用场景中,OVER窗口ORDER BY后才可以指定其他排序字段。要实现Top N功能,必须按照格式进行定义,否则Flink SQL的优化器将无法正常解析。此外,目前Table API中并不支持ROW_NUMBER()函数,所以只有在SQL中使用这种通用的Top N实现方式。以上代码展示了如何使用SQL查询从数据源中获取前三个最高价格的商品,并按照商品类型进行分区。
阅读全文