用Scala 写一段flink 连接mysql的代码
时间: 2024-02-23 18:57:50 浏览: 153
链接MySQL的代码
以下是一个简单的使用Scala连接MySQL数据库,并使用Flink进行数据操作的程序:
```scala
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
object MySQLSink {
def main(args: Array[String]): Unit = {
// 创建一个执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取数据源
val input = env.fromElements(("foo", 1), ("bar", 2), ("baz", 3))
// 将数据插入到MySQL数据库中
input.addSink(new MySQLSinkFunction)
// 执行计算并输出结果
env.execute("MySQL Sink Example")
}
class MySQLSinkFunction extends RichSinkFunction[(String, Int)] {
var connection: Connection = _
var preparedStatement: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
// 连接MySQL数据库
Class.forName("com.mysql.jdbc.Driver")
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password")
// 创建预编译语句
preparedStatement = connection.prepareStatement("INSERT INTO wordcount (word, count) VALUES (?, ?)")
}
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
// 将数据插入到MySQL数据库中
preparedStatement.setString(1, value._1)
preparedStatement.setInt(2, value._2)
preparedStatement.executeUpdate()
}
override def close(): Unit = {
// 关闭连接
if (preparedStatement != null) preparedStatement.close()
if (connection != null) connection.close()
}
}
}
```
运行以上代码,可以将数据源中的数据插入到MySQL数据库的`wordcount`表中。需要注意的是,上述代码中的数据库连接信息需要根据实际情况进行修改。
阅读全文