flink 1.14版本 怎么用 create table as select
时间: 2024-03-12 16:05:33 浏览: 259
在 Flink 1.14 版本中,可以使用以下语句创建新表并将查询结果插入其中:
```sql
CREATE TABLE new_table_name
WITH (
'connector' = 'filesystem',
'path' = '/path/to/new_table_name',
'format' = 'csv'
)
AS
SELECT * FROM source_table_name;
```
其中,new_table_name 表示新表的名称,/path/to/new_table_name 表示新表在文件系统中的路径,csv 表示新表的数据格式,可以根据具体需要调整。通过 WITH 子句指定表的连接器、路径和格式等属性,AS 子句后面的 SELECT 语句用于指定要插入到新表中的数据。
除了 * 外,还可以指定要查询的特定列,例如:
```sql
CREATE TABLE new_table_name
WITH (
'connector' = 'filesystem',
'path' = '/path/to/new_table_name',
'format' = 'csv'
)
AS
SELECT col1, col2 FROM source_table_name;
```
该语句会创建一个新表 new_table_name,并将 source_table_name 中的 col1 和 col2 列插入到新表中。注意,在使用 CREATE TABLE AS SELECT 语句时,新表的连接器、路径和格式等属性都需要指定。
相关问题
flink 1.14版本 mongodb sink
在 Flink 1.14 版本中,MongoDB Sink 的使用方式与之前的版本有些不同。具体来说,需要使用新的 MongoDB Connector for Flink,该 Connector 与 Flink 的版本保持一致,可以在 Flink 官方网站上下载。下载完成后,需要将 Connector 的 JAR 包添加到 Flink 的 classpath 中。
在代码中,可以使用以下方式创建一个 MongoDB Sink:
```java
MongoDBOutputFormatConfig.Builder builder = MongoDBOutputFormatConfig.builder()
.setHosts("localhost:27017")
.setDatabase("mydb")
.setCollection("mycollection")
.setDocumentClass(MyType.class);
MongoDBOutputFormat<MyType> outputFormat = new MongoDBOutputFormat<>(builder.build());
DataStream<MyType> stream = ...
stream.writeUsingOutputFormat(outputFormat);
```
其中,`MyType` 是要写入到 MongoDB 的数据类型。需要注意的是,在 Flink 1.14 中,MongoDB Sink 不再是一个 DataStream Sink,而是一个 OutputFormat。因此,在将 Sink 添加到 DataStream 中时,需要使用 `writeUsingOutputFormat` 方法。
除了上述方式,Flink 1.14 还提供了一种更加简便的创建 MongoDB Sink 的方式,即通过 `MongoDB.sink()` 方法:
```java
DataStream<MyType> stream = ...
stream.sink(new MongoDBSink<>(MongoDB.sink(
"mongodb://localhost:27017/mydb.mycollection",
new MyTypeMapper())
));
```
这种方式可以更加方便地创建 MongoDB Sink,不需要手动创建 OutputFormat。
flink sql CREATE TABLE AS SELECT
"CREATE TABLE AS SELECT" 是 Flink SQL 中的一种语法,用于创建一个新的表并将查询结果插入该表中。
具体语法如下:
```sql
CREATE TABLE new_table_name AS SELECT * FROM source_table_name;
```
其中,new_table_name 表示新创建的表名,source_table_name 则表示原始表名,* 表示查询结果中的所有列。该语句会将 source_table_name 中的数据插入到 new_table_name 中,如果 new_table_name 已经存在,则会覆盖原有的表。
除了 * 外,还可以指定查询结果中的特定列,例如:
```sql
CREATE TABLE new_table_name (col1 INT, col2 STRING) AS SELECT col1, col2 FROM source_table_name;
```
该语句会创建一个新表 new_table_name,其中包含两个列 col1 和 col2,并将 source_table_name 中的对应列插入到新表中。
阅读全文