.使用Spark SQL分析,找出每种类型的评分最高10部电影
时间: 2024-03-24 11:41:30 浏览: 66
可以使用Spark SQL来实现这个需求,具体代码如下:
```scala
import org.apache.spark.sql.SparkSession
object TopMovies {
case class Movie(movieId: Int, title: String, genres: String)
case class Rating(userId: Int, movieId: Int, rating: Double)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("TopMovies").getOrCreate()
import spark.implicits._
// 读取电影数据和评分数据
val movies = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("path/to/movies.csv").as[Movie]
val ratings = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("path/to/ratings.csv").as[Rating]
// 注册表
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")
// 使用Spark SQL查询每种类型的评分最高10部电影
val topMovies = spark.sql("SELECT genres, title, avg(rating) as avg_rating FROM movies JOIN ratings ON movies.movieId = ratings.movieId GROUP BY genres, title ORDER BY genres, avg_rating DESC")
topMovies.createOrReplaceTempView("topMovies")
val result = spark.sql("SELECT genres, title, avg_rating FROM (SELECT genres, title, avg_rating, ROW_NUMBER() OVER (PARTITION BY genres ORDER BY avg_rating DESC) as rank FROM topMovies) WHERE rank <= 10")
result.show()
spark.stop()
}
}
```
其中,假设电影数据的格式为`movieId, title, genres`,每行记录表示电影的ID、名称、类型等信息;评分数据的格式为`userId, movieId, rating`,每行记录表示用户对电影的评分。首先,将电影数据和评分数据读入DataFrame,并分别注册为表。然后,使用Spark SQL查询每种类型的电影中评分最高的10部,并输出结果。需要注意的是,这里使用了窗口函数`ROW_NUMBER()`来计算每种类型电影的评分排名,以便筛选排名前10的电影。最后,将结果输出到控制台。
需要注意的是,这里假设电影数据和评分数据都存储在CSV文件中,如果数据存储在其他格式的文件或数据源中,需要根据实际情况进行修改。
阅读全文