StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
时间: 2024-03-15 19:30:31 浏览: 15
这段代码是使用 Apache Flink 的 Java API 创建一个 StreamExecutionEnvironment 实例,它是一个基础类,用于配置 Flink 程序的执行环境和运行时参数。StreamExecutionEnvironment 提供了许多方法来定义数据源、数据转换和数据汇,以及设置并行度、检查点和容错机制等。通过在 StreamExecutionEnvironment 上调用这些方法,可以构建一个 Flink 流处理程序的数据流图,并将其提交到 Flink 集群或本地执行。
相关问题
Flink SQL专门用 OVER 聚合做了优化实现。所以只有在 Top N 的应用场景中,OVER 窗口 ORDER BY后才可以指定其它排序字段;而要想实现 Top N,就必须按照上面的格式进行定义,否则 Flink SQL 的优化器将无法正常解析。而且,目前 Table API 中并不支持 ROW_NUMBER()函数,所以也 只有 SQL 中这一种通用的 Top N 实现方式 public static void main(String[] args) { //执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment); //执行SQL tableEnvironment.executeSql("CREATE TABLE t_goods (\n" + " gid STRING,\n" + " type INT,\n" + " price INT,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.gid.length'='10',\n" + " 'fields.type.min'='1',\n" + " 'fields.type.max'='1',\n" + " 'fields.price.min'='100',\n" + " 'fields.price.max'='999'\n" + ")"); // tableEnvironment.sqlQuery("select * from t_goods").execute().print(); //排序开窗函数--所有数据的排序 tableEnvironment.sqlQuery("select * from (" + " select *, ROW_NUMBER() OVER (" + " PARTITION BY type " + " ORDER BY price desc " + " ) AS rownum from t_goods" + ") WHERE rownum <= 3 ").execute().print(); }。怎么理解。。。
Flink SQL 中的 OVER 函数是用于聚合和排序的一种优化实现方式。对于 Top N 的场景,可以通过 OVER 函数实现。但是需要注意的是,只有在 ORDER BY 后才能指定其它排序字段,否则 Flink SQL 的优化器将无法正常解析。此外,Table API 中并不支持 ROW_NUMBER() 函数,所以只能在 SQL 中使用这种通用的 Top N 实现方式。以上代码中,首先创建了一个数据源表 t_goods,然后使用 SQL 查询语句实现 Top N 功能,其中 ROW_NUMBER() 函数用于计算每个 type 分组内按 price 排序后的行号,最后通过 WHERE rownum <= 3 来筛选出前三个数据。
flink,Environment的四种创建方式
Flink提供了四种创建Execution Environment的方式:
1. LocalExecutionEnvironment:用于本地调试和测试。可以通过以下方式创建:
```java
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
```
2. RemoteExecutionEnvironment:用于连接到远程Flink集群。可以通过以下方式创建:
```java
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("host", port, "path/to/jar");
```
3. StreamExecutionEnvironment:用于流处理。可以通过以下方式创建:
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
```
4. BatchExecutionEnvironment:用于批处理。可以通过以下方式创建:
```java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
```
其中,第3种和第4种方式都是通过静态方法获取Execution Environment的实例,而第1种和第2种方式则是通过构造方法创建Execution Environment的实例。
相关推荐
![pdf](https://img-home.csdnimg.cn/images/20210720083512.png)
![rar](https://img-home.csdnimg.cn/images/20210720083606.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![rar](https://img-home.csdnimg.cn/images/20210720083606.png)