在scala中dataframe文件,计算每部电影的平均得分
时间: 2023-11-22 07:54:42 浏览: 95
假设你已经读取了一个名为`ratings`的DataFrame,它包含用户对电影的评分,其中包括列`movieId`和`rating`。你可以按以下方式计算每部电影的平均得分:
```scala
import org.apache.spark.sql.functions._
val avgRatings = ratings.groupBy("movieId").agg(avg("rating").as("avgRating"))
```
这将返回一个名为`avgRatings`的DataFrame,其中包含每部电影的平均得分。该DataFrame具有两列:`movieId`和`avgRating`,其中`movieId`是电影的ID,`avgRating`是该电影的平均得分。
相关问题
spark Scala中合并两个文件,计算每部电影的平均得分是多少 Ratings.csv文件 文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp userId: 每个用户的id movieId: 每部电影的id rating: 用户评分,是5星制 timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.csv文件 movieId: 每部电影的Id title:每部电影的名字
可以使用Spark的DataFrame API来实现这个功能。首先,我们需要读取两个文件并将它们转换成DataFrame格式:
```scala
val ratings = spark.read.format("csv")
.option("header", "true")
.load("path/to/ratings.csv")
val movies = spark.read.format("csv")
.option("header", "true")
.load("path/to/movies.csv")
```
接下来,我们需要将两个DataFrame进行合并,可以使用`join`操作:
```scala
val joined = ratings.join(movies, Seq("movieId"), "left")
```
这里使用`Seq("movieId")`作为`join`的连接键,表示按照`movieId`这个列进行合并。`"left"`表示使用左连接,即保留`ratings`中的所有记录。
最后,我们可以使用`groupBy`和`agg`操作来计算每部电影的平均得分:
```scala
val result = joined.groupBy("movieId", "title")
.agg(avg("rating").as("avg_rating"))
.sort(desc("avg_rating"))
```
这里使用`groupBy`按照`movieId`和`title`进行分组,然后使用`agg`计算每组的平均得分,并将结果保存在名为`avg_rating`的新列中。最后使用`sort`按照平均得分降序排序。
完整代码如下:
```scala
import org.apache.spark.sql.functions._
val ratings = spark.read.format("csv")
.option("header", "true")
.load("path/to/ratings.csv")
val movies = spark.read.format("csv")
.option("header", "true")
.load("path/to/movies.csv")
val joined = ratings.join(movies, Seq("movieId"), "left")
val result = joined.groupBy("movieId", "title")
.agg(avg("rating").as("avg_rating"))
.sort(desc("avg_rating"))
result.show()
```
用scala进行分析。文件说明 Ratings.csv文件 文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp userId: 每个用户的id movieId: 每部电影的id rating: 用户评分,是5星制 timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.csv文件 movieId: 每部电影的Id title:每部电影的名字 题目说明 (1) 统计电影总数(去重) (2) 合并两个文件,计算每部电影的平均得分是多少(说明:显示结果为movieId,title,平均得分) (3) 统计得分(平均分)排名前10的电影(输出电影名称) (4) 统计最受欢迎的电影(即评分人数最多的电影),
用Scala进行分析,可以使用Spark框架来处理这些数据。
首先,我们需要导入Spark相关的包和创建SparkSession:
```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Movie Ratings Analysis")
.master("local[*]")
.getOrCreate()
```
然后,我们可以读取Ratings.csv和movies.csv文件,并将它们转化为DataFrame:
```scala
val ratingsDF = spark.read.format("csv")
.option("header", "true")
.load("path/to/Ratings.csv")
val moviesDF = spark.read.format("csv")
.option("header", "true")
.load("path/to/movies.csv")
```
接下来,我们可以开始进行数据分析。
(1)统计电影总数(去重)
我们可以使用DataFrame的distinct方法来去重,并使用count方法来计算总数:
```scala
val totalMovies = moviesDF.select("movieId").distinct().count()
println(s"Total number of movies: $totalMovies")
```
(2)合并两个文件,计算每部电影的平均得分是多少
我们可以使用DataFrame的join方法来将Ratings.csv和movies.csv文件合并,然后使用groupBy和avg方法来计算每部电影的平均得分:
```scala
val combinedDF = ratingsDF.join(moviesDF, "movieId")
val avgRatingsDF = combinedDF.groupBy("movieId", "title")
.avg("rating")
.orderBy("movieId")
.withColumnRenamed("avg(rating)", "averageRating")
avgRatingsDF.show(false)
```
(3)统计得分(平均分)排名前10的电影(输出电影名称)
我们可以使用DataFrame的orderBy和limit方法来排序和限制结果集:
```scala
val top10DF = avgRatingsDF.orderBy($"averageRating".desc).limit(10)
val top10Titles = top10DF.select("title").collect().map(_.getString(0))
println(s"Top 10 movies by average rating: ${top10Titles.mkString(", ")}")
```
(4)统计最受欢迎的电影(即评分人数最多的电影)
我们可以使用DataFrame的count方法来计算每部电影的评分人数,并使用orderBy和limit方法来获取评分人数最多的电影:
```scala
val popularDF = combinedDF.groupBy("movieId", "title")
.count()
.orderBy($"count".desc)
.limit(1)
val popularMovie = popularDF.select("title").collect().map(_.getString(0)).head
val popularCount = popularDF.select("count").collect().map(_.getLong(0)).head
println(s"Most popular movie: $popularMovie ($popularCount ratings)")
```
完整的代码如下:
```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Movie Ratings Analysis")
.master("local[*]")
.getOrCreate()
val ratingsDF = spark.read.format("csv")
.option("header", "true")
.load("path/to/Ratings.csv")
val moviesDF = spark.read.format("csv")
.option("header", "true")
.load("path/to/movies.csv")
val totalMovies = moviesDF.select("movieId").distinct().count()
println(s"Total number of movies: $totalMovies")
val combinedDF = ratingsDF.join(moviesDF, "movieId")
val avgRatingsDF = combinedDF.groupBy("movieId", "title")
.avg("rating")
.orderBy("movieId")
.withColumnRenamed("avg(rating)", "averageRating")
avgRatingsDF.show(false)
val top10DF = avgRatingsDF.orderBy($"averageRating".desc).limit(10)
val top10Titles = top10DF.select("title").collect().map(_.getString(0))
println(s"Top 10 movies by average rating: ${top10Titles.mkString(", ")}")
val popularDF = combinedDF.groupBy("movieId", "title")
.count()
.orderBy($"count".desc)
.limit(1)
val popularMovie = popularDF.select("title").collect().map(_.getString(0)).head
val popularCount = popularDF.select("count").collect().map(_.getLong(0)).head
println(s"Most popular movie: $popularMovie ($popularCount ratings)")
```
阅读全文