请给出使用 Flink 1.13.0 将具有一个主键和两个值的数据插入和修改到 MySQL 数据库的示例代码
时间: 2024-05-03 19:22:02 浏览: 156
好的,以下是示例代码:
```
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.configuration.Configuration
import java.sql.{Connection, DriverManager, PreparedStatement}
case class Data(name: String, value1: Int, value2: Int)
object FlinkMySQL {
def main(args: Array[String]): Unit = {
// Flink 程序入口
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.socketTextStream("localhost", 9000)
.map(line => {
val arr = line.split(",")
Data(arr(0), arr(1).toInt, arr(2).toInt)
})
dataStream.addSink(new MySQLSink)
env.execute("Flink MySQL")
}
class MySQLSink extends RichSinkFunction[Data] {
var conn: Connection = _
var statement: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
super.open(parameters) // 必须要调用基类的方法
val url = "jdbc:mysql://localhost:3306/mydb?useSSL=false&allowPublicKeyRetrieval=true" // MySQL 连接字符串
val username = "root" // 数据库用户名
val password = "root" // 数据库密码
conn = DriverManager.getConnection(url, username, password) // 获取数据库连接
statement = conn.prepareStatement("INSERT INTO mytable(name,value1,value2) VALUES(?,?,?) " +
"ON DUPLICATE KEY UPDATE value1=?, value2=?") // 预编译 SQL 语句
}
override def invoke(value: Data, context: RuntimeContext[_]): Unit = {
statement.setString(1, value.name)
statement.setInt(2, value.value1)
statement.setInt(3, value.value2)
statement.setInt(4, value.value1)
statement.setInt(5, value.value2)
statement.execute() // 执行 SQL 语句
}
override def close(): Unit = {
super.close() // 必须要调用基类的方法
if (statement != null) {
statement.close() // 关闭 Statement
}
if (conn != null) {
conn.close() // 关闭 Connection
}
}
}
}
```
这是一个使用 Flink 将数据插入 MySQL 数据库的示例代码,其中使用了 `RichSinkFunction` 类来编写自定义的 sink 函数。具体的步骤包括:
1. 从 Socket 中读取数据,并将数据解析为 `Data` 对象;
2. 将解析后的 `Data` 对象通过 `addSink` 方法送到自定义的 `MySQLSink` 中;
3. 在 `MySQLSink` 中,首先在 `open` 方法中获取数据库连接,并预编译 SQL 语句;
4. 在 `invoke` 方法中,将 `Data` 对象的字段值分别绑定到 SQL 语句中的占位符上,并执行 SQL 语句;
5. 在 `close` 方法中,关闭数据库连接。
注意,这里使用了 MySQL 中的 `ON DUPLICATE KEY UPDATE` 语法,可以实现在 `mytable` 表中通过 `name` 字段自动判断数据是否存在,如果存在,则更新 `value1` 和 `value2`,否则插入一条新的记录。这里假设 `mytable` 表已经事先创建好了,字段名称和类型与 `Data` 类的字段一一对应。另外,为了演示方便,连接字符串、用户名和密码都是硬编码,实际使用时需要从配置文件中读取。
阅读全文