使用spark SQL分析,找出每种类型的评分最高10部电影
时间: 2024-03-23 14:39:33 浏览: 78
好的,我可以帮你回答这个问题。首先,需要加载movies.csv文件和ratings.csv文件,并转换为DataFrame。然后,我们需要对数据进行处理,包括合并两个DataFrame、按电影类型分组、计算每个电影的平均评分、按平均评分排序、选取每种类型的评分最高的10部电影。最后,我们将结果打印出来。
以下是代码示例:
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
# 创建SparkSession
spark = SparkSession.builder.appName("MovieAnalysis").getOrCreate()
# 加载数据
movies_df = spark.read.csv("movies.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv("ratings.csv", header=True, inferSchema=True)
# 合并两个DataFrame
movie_ratings_df = movies_df.join(ratings_df, "movieId", "inner")
# 按电影类型分组,计算每个电影的平均评分,按平均评分排序,选取每种类型的评分最高的10部电影
top_movies_df = movie_ratings_df.groupBy("genres", "title") \
.agg({"rating": "avg", "timestamp": "count"}) \
.withColumnRenamed("avg(rating)", "avg_rating") \
.withColumnRenamed("count(timestamp)", "num_ratings") \
.orderBy(["genres", desc("avg_rating"), desc("num_ratings")]) \
.groupBy("genres") \
.agg({"title": "collect_list"}) \
.withColumnRenamed("collect_list(title)", "top_movies") \
.select("genres", "top_movies") \
.limit(10)
# 打印结果
top_movies_df.show(truncate=False)
# 停止SparkSession
spark.stop()
```
我们首先使用`read.csv`方法加载文件,并设置`header=True`和`inferSchema=True`以自动推断数据类型。然后,我们使用`join`方法将两个DataFrame合并,连接键为`movieId`。接下来,我们使用`groupBy`方法对电影类型和电影名称进行分组,使用`agg`方法计算每个电影的平均评分和评论次数,并使用`withColumnRenamed`方法重命名列名。然后,我们使用`orderBy`方法按电影类型、平均评分和评论次数进行排序,然后再次使用`groupBy`方法按电影类型进行分组,使用`agg`方法将每种类型的评分最高的10部电影收集到一个列表中,并使用`select`方法选取需要的列。最后,我们使用`limit`方法限制结果集的大小,并使用`show`方法打印结果。
希望这可以回答你的问题。
阅读全文