配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
时间: 2024-03-23 08:35:50 浏览: 115
好的,首先你需要在Spark中添加MySQL的JDBC驱动程序,可以在Spark的conf目录下的spark-defaults.conf文件中添加以下配置:
```conf
spark.driver.extraClassPath /path/to/mysql-connector-java.jar
spark.executor.extraClassPath /path/to/mysql-connector-java.jar
```
其中`/path/to/mysql-connector-java.jar`是MySQL的JDBC驱动程序的路径。
然后,你需要编写Spark SQL程序来连接MySQL数据库并插入数据。以下是一个示例程序:
```scala
import org.apache.spark.sql.{DataFrame, SparkSession}
object MySQLExample {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("MySQLExample")
.master("local[*]")
.getOrCreate()
// 读取MySQL中的数据
val url = "jdbc:mysql://localhost:3306/test"
val user = "root"
val password = "password"
val table = "person"
val df = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
// 插入数据到MySQL中
val data = Seq((3, "John", 25), (4, "Mary", 30))
val columns = Seq("id", "name", "age")
val insertDF = spark.createDataFrame(data).toDF(columns: _*)
insertDF.write
.format("jdbc")
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.mode("append")
.save()
// 计算age的最大值和总和
val maxAge = df.selectExpr("max(age)").collect()(0)(0)
val sumAge = df.selectExpr("sum(age)").collect()(0)(0)
println(s"Max age: $maxAge")
println(s"Sum age: $sumAge")
// 关闭SparkSession
spark.stop()
}
}
```
其中,`url`、`user`、`password`和`table`是连接MySQL数据库和操作的表的相关信息。`df`代表从MySQL中读取的数据,`insertDF`是要插入的数据。`max`和`sum`函数分别计算age的最大值和总和。最后,程序打印出结果并关闭SparkSession。
需要注意的是,上述代码中的`mysql-connector-java.jar`需要与MySQL版本匹配,否则可能会出现连接错误。
阅读全文