1.打开右侧命令行窗口,在 MySQL 数据库中新建数据库 sparktest,再建表 employee,字段及数据如下所示; 字段名 数据类型 id int(4) name char(20) gender char(4) age int(4) 数据如下: 1 Alice F 22 2 John M 25 MySQL 数据库连接信息 账号: root 密码:123123 地址:127.0.0.1 端口:3306 启动命令:service mysql start 2.打开代码文件窗口,配置 Spark 通过 JDBC 连接数据库 MySQL ,编程实现利用 DataFrame 插入下列数据到 MySQL,最后打印出 age 的最大值和 age 的总和。 数据如下: 3 Mary F 26 4 Tom M 23 输出格式如下: age->max,age->sum
时间: 2024-02-16 18:03:19 浏览: 180
SparkTest:Intellij Idea中的Spark Scala项目样本
好的,您的问题是如何使用 Spark 通过 JDBC 连接 MySQL 数据库,利用 DataFrame 插入数据,并计算 age 的最大值和总和。
首先,您需要在 Spark 中通过 JDBC 连接 MySQL 数据库。可以使用以下代码来实现:
```scala
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/sparktest")
.option("dbtable", "employee")
.option("user", "root")
.option("password", "123123")
.load()
```
这里的 url、user 和 password 参数需要替换为您自己 MySQL 数据库的连接信息。
然后,您可以通过 DataFrame 插入数据。可以使用以下代码来实现:
```scala
import spark.implicits._
val data = Seq(
(3, "Mary", "F", 26),
(4, "Tom", "M", 23)
).toDF("id", "name", "gender", "age")
data.write
.mode("append")
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/sparktest")
.option("dbtable", "employee")
.option("user", "root")
.option("password", "123123")
.save()
```
这里的 url、user 和 password 参数需要替换为您自己 MySQL 数据库的连接信息。
最后,您可以使用 DataFrame 的聚合函数来计算 age 的最大值和总和。可以使用以下代码来实现:
```scala
val result = jdbcDF.agg(max("age"), sum("age")).head()
println(s"age->${result.getInt(0)},age->${result.getInt(1)}")
```
这里的 max() 和 sum() 函数是 DataFrame 的内置聚合函数。
完整的代码如下:
```scala
import org.apache.spark.sql.functions._
// 连接 MySQL 数据库
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/sparktest")
.option("dbtable", "employee")
.option("user", "root")
.option("password", "123123")
.load()
// 插入数据
val data = Seq(
(3, "Mary", "F", 26),
(4, "Tom", "M", 23)
).toDF("id", "name", "gender", "age")
data.write
.mode("append")
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/sparktest")
.option("dbtable", "employee")
.option("user", "root")
.option("password", "123123")
.save()
// 计算 age 的最大值和总和
val result = jdbcDF.agg(max("age"), sum("age")).head()
println(s"age->${result.getInt(0)},age->${result.getInt(1)}")
```
输出格式应该如下:
```
age->26,age->96
```
阅读全文