二、使用sparkSql将student.txt数据写入mysql(30分) 字段如下: s_id,s_name,s_sex,s_birthday,s_class,s_specialty,s_school,s_register_time 创建样例类Student 1.创建sparkSession对象 2.读取数据 3.切分每一行, 4.RDD关联Student 导入隐式转换 import spark.implicits._ 5.将RDD转换成DataFrame 6.将DataFrame注册成表 7.操作student表 ,按照年龄进行降序排列 8.把结果保存在mysql表中 (1)创建Properties对象,配置连接mysql的用户名和密码 (2)写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错
时间: 2024-02-03 20:15:42 浏览: 74
以下是实现代码:
```scala
import java.util.Properties
import org.apache.spark.sql.SparkSession
case class Student(s_id: String, s_name: String, s_sex: String, s_birthday: String, s_class: String, s_specialty: String, s_school: String, s_register_time: String)
object StudentToMysql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("StudentToMysql")
.master("local[*]")
.getOrCreate()
// 读取数据
val studentRDD = spark.sparkContext.textFile("student.txt")
// 切分每一行并关联Student
val studentDS = studentRDD.map(_.split(","))
.map(attributes => Student(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4), attributes(5), attributes(6), attributes(7)))
.toDS()
// 将DataFrame注册成表
studentDS.createOrReplaceTempView("student")
// 操作student表,按照年龄进行降序排列
val resultDF = spark.sql("SELECT * FROM student ORDER BY s_birthday DESC")
// 配置连接mysql的用户名和密码
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "123456")
// 把结果保存在mysql表中,overwrite覆盖
resultDF.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/test", "test.student", prop)
spark.stop()
}
}
```
注意:在运行代码之前,需要先在mysql中创建一个名为test的数据库,以及名为student的表,并且表的字段需要与样例类中定义的字段保持一致。
阅读全文