flink sql 实时读取mysql表scala代码
时间: 2024-09-30 11:06:57 浏览: 37
```
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableResult}
object FlinkSqlDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val mysqlConf = Map(
"url" -> "jdbc:mysql://localhost:3306/test",
"table-name" -> "user",
"username" -> "root",
"password" -> "root"
)
tableEnv.executeSql(
s"""
|CREATE TABLE user (
| user_id BIGINT,
| name STRING,
| sex STRING,
| money DOUBLE
|) WITH (
| 'connector' = 'jdbc',
| 'driver' = 'com.mysql.jdbc.Driver',
| 'url' = '${mysqlConf("url")}',
| 'table-name' = '${mysqlConf("table-name")}',
| 'username' = '${mysqlConf("username")}',
| 'password' = '${mysqlConf("password")}'
|)
|""".stripMargin)
val result: TableResult = tableEnv.sqlQuery("SELECT * FROM user")
result.print()
env.execute("FlinkSqlDemo")
}
}
```
阅读全文