flink sql 读取mysql 多个表 java
时间: 2024-03-03 20:47:30 浏览: 148
使用flink-connector-sqlserver-cdc 2.3.0把数据从SQL Server实时同步到MySQL中
5星 · 资源好评率100%
要在Java中使用Flink SQL读取MySQL的多个表,可以使用Flink的JDBC Connector。首先,需要在Maven的依赖中添加以下依赖项:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
```
然后,可以使用类似下面的代码创建一个外部表:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.table.sources.JdbcTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.Row;
public class ReadMySQLTable {
public static void main(String[] args) throws Exception {
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// define the schema of the table
String[] fieldNames = {"id", "name", "age", "address"};
LogicalType[] fieldTypes = {
new BigIntType(),
new VarCharType(),
new IntegerType(),
new VarCharType()
};
TableSchema schema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
// define the JDBC connector options
JdbcTableSource jdbcTableSource = JdbcTableSource.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/my_db")
.setUsername("my_user")
.setPassword("my_password")
.setQuery("SELECT * FROM my_table1 UNION SELECT * FROM my_table2")
.setSchema(schema)
.build();
// register the table source in the table environment
tableEnv.registerTableSource("my_table", jdbcTableSource);
// execute the query and print the results
tableEnv.sqlQuery("SELECT * FROM my_table").toRetractStream(Row.class).print();
// execute the program
env.execute("Read MySQL Table");
}
}
```
这里的`my_table1`和`my_table2`是MySQL中的两个表,`my_db`是这些表所在的数据库,`my_user`和`my_password`是连接MySQL所需要的用户名和密码。这里使用了UNION操作将这两个表合并在一起,然后将它们作为一个JDBC Table Source注册到Flink的Table Environment中。最后,执行SQL查询并打印结果。
阅读全文