Scala spark sql 编程实现RDD转DataFrame,并选择一个合理的SQL分析。ratings.csv文件 和movies.csv文件 ratings.csv文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp userId: 每个用户的id movieId: 每部电影的id rating: 用户评分,是5星制 timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.cs
时间: 2024-02-12 22:07:47 浏览: 145
v文件里面包含了每部电影的详细信息,格式如下: movieId, title, genres movieId: 每部电影的Id title:每部电影的名字 genres: 每部电影所属的类型(可以是多个,用"|"分隔)
以下是Scala Spark SQL编程实现RDD转DataFrame,并进行分析的示例代码:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
object RDD2DataFrame {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("RDD to DataFrame")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
// 定义ratings.csv文件和movies.csv文件的路径
val ratingsFilePath = "path/to/ratings.csv"
val moviesFilePath = "path/to/movies.csv"
// 定义ratings.csv文件的schema
val ratingsSchema = StructType(Seq(
StructField("userId", IntegerType, nullable = false),
StructField("movieId", IntegerType, nullable = false),
StructField("rating", DoubleType, nullable = false),
StructField("timestamp", LongType, nullable = false)
))
// 读取ratings.csv文件并转换为DataFrame
val ratingsDF = spark.read
.option("header", "true")
.schema(ratingsSchema)
.csv(ratingsFilePath)
// 定义movies.csv文件的schema
val moviesSchema = StructType(Seq(
StructField("movieId", IntegerType, nullable = false),
StructField("title", StringType, nullable = false),
StructField("genres", StringType, nullable = false)
))
// 读取movies.csv文件并转换为DataFrame
val moviesDF = spark.read
.option("header", "true")
.schema(moviesSchema)
.csv(moviesFilePath)
// 使用Spark SQL进行分析,找出评分最高的10部电影
ratingsDF.join(moviesDF, Seq("movieId"), "inner")
.groupBy("title")
.agg(avg("rating").as("avg_rating"))
.orderBy(desc("avg_rating"))
.limit(10)
.show()
spark.stop()
}
}
```
SQL分析:从ratings.csv文件中获取每个用户对每部电影的评分数据,从movies.csv文件中获取每部电影的名字和类型数据,将两个数据集合并后,统计每部电影的平均评分并输出评分最高的前十部电影。
阅读全文