flink原生planner读mysql
时间: 2023-07-10 17:40:37 浏览: 39
Flink原生Planner可以通过Flink的JDBCInputFormat来读取MySQL中的数据。具体步骤如下:
1. 导入Flink SQL客户端和MySQL JDBC驱动:
```java
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.JdbcTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarCharType;
import java.util.Properties;
import static org.apache.flink.table.api.Expressions.$;
```
2. 创建Flink TableEnvironment:
```java
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
```
3. 创建MySQL连接属性:
```java
Properties properties = new Properties();
properties.setProperty("url", "jdbc:mysql://localhost:3306/test");
properties.setProperty("driver", "com.mysql.jdbc.Driver");
properties.setProperty("username", "root");
properties.setProperty("password", "password");
```
4. 创建JdbcTableSource:
```java
JdbcTableSource source = JdbcTableSource.builder()
.setDriverName(properties.getProperty("driver"))
.setDBUrl(properties.getProperty("url"))
.setUsername(properties.getProperty("username"))
.setPassword(properties.getProperty("password"))
.setTableName("table_name")
.build();
```
5. 注册TableSource:
```java
tableEnv.registerTableSource("table_name", source);
```
6. 查询数据:
```java
Table result = tableEnv.from("table_name")
.select($("column1"), $("column2"))
.where($("column1").isNotNull());
```
7. 将查询结果转换为DataStream:
```java
DataStream<Tuple2<String, String>> stream = tableEnv.toAppendStream(result, TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
```
以上就是通过Flink原生Planner读取MySQL中数据的步骤。