二、使用sparkSql将student.txt数据写入mysql(30分) 字段如下: s_id,s_name,s_sex,s_birthday,s_class,s_specialty,s_school,s_register_time 创建样例类Student 1.创建sparkSession对象 2.读取数据 3.切分每一行, 4.RDD关联Student 导入隐式转换 import spark.implicits._ 5.将RDD转换成DataFrame 6.将DataFrame注册成表 7.操作student表 ,按照年龄进行降序排列 8.把结果保存在mysql表中 (1)创建Properties对象,配置连接mysql的用户名和密码 (2)写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error
时间: 2024-02-03 15:14:51 浏览: 78
以下是使用Spark SQL将student.txt数据写入MySQL的代码:
```scala
// 1.创建SparkSession对象
val spark = SparkSession.builder()
.appName("Write to MySQL")
.master("local[*]")
.getOrCreate()
// 2.读取数据
val lines = spark.sparkContext.textFile("path/to/student.txt")
// 3.切分每一行, 4.RDD关联Student
val students = lines.map(line => {
val fields = line.split(",")
Student(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5), fields(6), fields(7))
})
// 5.将RDD转换成DataFrame
val studentDF = students.toDF()
// 6.将DataFrame注册成表
studentDF.createOrReplaceTempView("student")
// 7.操作student表 ,按照年龄进行降序排列
val resultDF = spark.sql("SELECT * FROM student ORDER BY s_birthday DESC")
// 8.把结果保存在mysql表中
// (1)创建Properties对象,配置连接MySQL的用户名和密码
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "123456")
// (2)写入MySQL时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error
resultDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test", "student", prop)
```
其中,`Student`样例类的定义如下:
```scala
case class Student(s_id: String, s_name: String, s_sex: String, s_birthday: String, s_class: String, s_specialty: String, s_school: String, s_register_time: String)
```
在代码中,我们首先通过`SparkSession`对象创建了一个本地运行的Spark应用程序,然后读取了`student.txt`文件中的数据,并将每一行切分成一个个字段,再将其关联到一个名为`Student`的样例类中。接着,我们将这些`Student`对象转换成一个DataFrame,并将其注册为一个名为`student`的表。在这个表上,我们执行了一个按照`birthday`字段降序排列的查询,并将结果保存在了MySQL表`student`中。
在将结果保存到MySQL表时,我们通过`Properties`对象设置了连接MySQL的用户名和密码,并指定了写入模式为追加(`SaveMode.Append`)。这表示如果表已经存在,则将新的数据追加到表的末尾;如果表不存在,则会自动创建一个新表。如果需要覆盖原有表的数据,则可以将写入模式设为`overwrite`,如果要忽略重复数据,则可以将模式设为`ignore`,如果要在遇到重复数据时抛出异常,则可以将模式设为`error`。
阅读全文