flink流批一体sql
时间: 2023-12-06 19:38:11 浏览: 147
flinkStreamSQL
Flink提供了Table API和SQL作为*********```sql
-- 创建一个输入表
CREATE TABLE input_table (
id BIGINT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'input_topic',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json'
);
-- 创建一个输出表
CREATE TABLE output_table (
name STRING,
age INT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/test',
'connector.table' = 'output_table',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '5000'
);
-- 执行查询并将结果插入到输出表中
INSERT INTO output_table
SELECT name, MAX(age) AS age
FROM input_table
GROUP BY name;
```
上述例子中,我们使用Flink SQL创建了一个输入表和一个输出表,并执行了一个查询,将结果插入***
阅读全文