二、使用sparkSql将student.txt数据写入mysql(30分) 字段如下: s_id,s_name,s_sex,s_birthday,s_class,s_specialty,s_school,s_register_time 创建样例类Student 1.创建sparkSession对象 2.读取数据 3.切分每一行, 4.RDD关联Student 导入隐式转换 import spark.implicits._ 5.将RDD转换成DataFrame 6.将DataFrame注册成表 7.操作student表 ,按照年龄进行降序排列 8.把结果保存在mysql表中 (1)创建Properties对象,配置连接mysql的用户名和密码 (2)写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错
时间: 2024-02-03 19:15:32 浏览: 119
读取student.txt文件内容,每行作为一条数据,创建对应Student对象。
代码如下:
```scala
import org.apache.spark.sql.SparkSession
case class Student(s_id: String, s_name: String, s_sex: String, s_birthday: String, s_class: String, s_specialty: String, s_school: String, s_register_time: String)
object SparkSqlDemo {
def main(args: Array[String]): Unit = {
// 1.创建sparkSession对象
val spark = SparkSession.builder()
.appName("SparkSqlDemo")
.master("local[*]")
.getOrCreate()
// 2.读取数据
val studentRDD = spark.sparkContext.textFile("path/to/student.txt")
// 3.切分每一行
val studentInfo = studentRDD.map(_.split(","))
// 4.RDD关联Student
val students = studentInfo.map(x => Student(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
// 5.将RDD转换成DataFrame
val studentDF = students.toDF()
// 6.将DataFrame注册成表
studentDF.createOrReplaceTempView("student")
// 7.操作student表,按照年龄进行降序排列
val resultDF = spark.sql("select * from student order by s_birthday desc")
// 8.把结果保存在mysql表中
val props = new java.util.Properties
props.setProperty("user", "username")
props.setProperty("password", "password")
resultDF.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/test", "student", props)
spark.stop()
}
}
```
其中,需要将 `path/to/student.txt` 替换为实际的文件路径,将 `username` 和 `password` 替换为实际的 MySQL 用户名和密码,将 `jdbc:mysql://localhost:3306/test` 替换为实际的 MySQL 数据库地址和数据库名。在写入 MySQL 时,我们使用了 `overwrite` 模式,表示如果表已经存在则覆盖,这样可以保证每次运行程序都能得到相同的结果。
阅读全文