用Spark SQL分析,找出每种类型的评分最高10部电影
时间: 2024-03-23 22:40:30 浏览: 15
好的,下面是使用Spark SQL分析,找出每种类型的评分最高10部电影的示例代码:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
// 读取movies、ratings和movie_genres三张表的数据
val movies = spark.read.format("csv").option("header", "true").load("movies.csv")
val ratings = spark.read.format("csv").option("header", "true").load("ratings.csv")
val movieGenres = spark.read.format("csv").option("header", "true").load("movie_genres.csv")
// 将movie_genres表进行展开,将每个电影的每个类型都展开成一行
val explodedGenres = movieGenres.withColumn("genre", explode(split(col("genres"), "\\|")))
.drop("genres")
// 将ratings和explodedGenres表与movies表进行JOIN,获得每部电影的评分和类型信息
val joinedData = ratings.join(movies, Seq("movie_id")).join(explodedGenres, Seq("movie_id"))
// 计算每种类型的电影的评分,并按照类型和评分降序排列
val genreRatings = joinedData.groupBy("genre", "title")
.agg(avg("rating").as("average_rating"), count("rating").as("rating_count"))
.withColumn("rank", dense_rank().over(Window.partitionBy("genre").orderBy(desc("average_rating"))))
.filter($"rank" <= 10)
.orderBy("genre", "rank")
// 打印结果
genreRatings.show()
```
在这个代码中,我们首先读取了movies、ratings和movie_genres三张表的数据,然后将movie_genres表进行展开,将每个电影的每个类型都展开成一行。接下来,我们将ratings和explodedGenres表与movies表进行JOIN,获得每部电影的评分和类型信息。
然后,我们按照类型和评分降序排列,计算每种类型的电影的评分,并使用窗口函数dense_rank()计算每种类型的电影评分排名。最后,我们筛选出每种类型评分最高的10部电影,并按照类型和排名升序排列。
注意,在实际使用时需要根据实际数据表结构来修改代码。