三、使用sparkSql将student.txt转换成DateFrame进行相关操作(40分) 字段如下: s_id,s_name,s_sex,s_birthday,s_class,s_specialty,s_school,s_register_time 创建样例类Stusent 1.创建SparkSession 2.获取sparkContext对象 设置日志打印级别 sc.setLogLevel(WARN) 3.加载数据 4.切分每一行封装数据到Student中 5.加载数据到Row对象中 7.利用personRDD与Schema创建DataFrame 8.DSL操作显示DataFrame的数据结果 9.将DataFrame注册成表 10.sql语句操作 11.关闭资源
时间: 2024-02-05 10:11:00 浏览: 77
regionprops_to_df:取得包含skimage.measure.regionprops输出的regionprops对象的列表,返回所有非隐藏,非隐藏标量属性的Pandas DataFrame
以下是代码实现:
```scala
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
case class Student(s_id: String, s_name: String, s_sex: String, s_birthday: String,
s_class: String, s_specialty: String, s_school: String, s_register_time: String)
object StudentDF {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("StudentDF")
.master("local[*]")
.getOrCreate()
// 获取sparkContext对象,设置日志打印级别
val sc = spark.sparkContext
sc.setLogLevel("WARN")
// 加载数据
val data = sc.textFile("student.txt")
// 切分每一行封装数据到Student中
val studentRDD = data.map(line => {
val fields = line.split(",")
Student(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5), fields(6), fields(7))
})
// 加载数据到Row对象中
val studentRowRDD = studentRDD.map(student => {
Row(student.s_id, student.s_name, student.s_sex, student.s_birthday,
student.s_class, student.s_specialty, student.s_school, student.s_register_time)
})
// 利用personRDD与Schema创建DataFrame
val schema = StructType(List(
StructField("s_id", StringType, true),
StructField("s_name", StringType, true),
StructField("s_sex", StringType, true),
StructField("s_birthday", StringType, true),
StructField("s_class", StringType, true),
StructField("s_specialty", StringType, true),
StructField("s_school", StringType, true),
StructField("s_register_time", StringType, true)
))
val studentDF = spark.createDataFrame(studentRowRDD, schema)
// DSL操作显示DataFrame的数据结果
studentDF.show()
// 将DataFrame注册成表
studentDF.createOrReplaceTempView("student")
// sql语句操作
val resultDF = spark.sql("select s_name,s_sex,s_class from student where s_specialty='计算机科学与技术'")
resultDF.show()
// 关闭资源
spark.stop()
}
}
```
注意:需要将student.txt放在项目根目录下。
阅读全文