用scala进行分析。文件说明 Ratings.csv文件 文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp userId: 每个用户的id movieId: 每部电影的id rating: 用户评分,是5星制 timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.csv文件 movieId: 每部电影的Id title:每部电影的名字 题目说明 (1) 统计电影总数(去重) (2) 合并两个文件,计算每部电影的平均得分是多少(说明:显示结果为movieId,title,平均得分) (3) 统计得分(平均分)排名前10的电影(输出电影名称) (4) 统计最受欢迎的电影(即评分人数最多的电影),
时间: 2024-03-02 11:49:23 浏览: 163
用Scala进行分析,可以使用Spark框架来处理这些数据。
首先,我们需要导入Spark相关的包和创建SparkSession:
```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Movie Ratings Analysis")
.master("local[*]")
.getOrCreate()
```
然后,我们可以读取Ratings.csv和movies.csv文件,并将它们转化为DataFrame:
```scala
val ratingsDF = spark.read.format("csv")
.option("header", "true")
.load("path/to/Ratings.csv")
val moviesDF = spark.read.format("csv")
.option("header", "true")
.load("path/to/movies.csv")
```
接下来,我们可以开始进行数据分析。
(1)统计电影总数(去重)
我们可以使用DataFrame的distinct方法来去重,并使用count方法来计算总数:
```scala
val totalMovies = moviesDF.select("movieId").distinct().count()
println(s"Total number of movies: $totalMovies")
```
(2)合并两个文件,计算每部电影的平均得分是多少
我们可以使用DataFrame的join方法来将Ratings.csv和movies.csv文件合并,然后使用groupBy和avg方法来计算每部电影的平均得分:
```scala
val combinedDF = ratingsDF.join(moviesDF, "movieId")
val avgRatingsDF = combinedDF.groupBy("movieId", "title")
.avg("rating")
.orderBy("movieId")
.withColumnRenamed("avg(rating)", "averageRating")
avgRatingsDF.show(false)
```
(3)统计得分(平均分)排名前10的电影(输出电影名称)
我们可以使用DataFrame的orderBy和limit方法来排序和限制结果集:
```scala
val top10DF = avgRatingsDF.orderBy($"averageRating".desc).limit(10)
val top10Titles = top10DF.select("title").collect().map(_.getString(0))
println(s"Top 10 movies by average rating: ${top10Titles.mkString(", ")}")
```
(4)统计最受欢迎的电影(即评分人数最多的电影)
我们可以使用DataFrame的count方法来计算每部电影的评分人数,并使用orderBy和limit方法来获取评分人数最多的电影:
```scala
val popularDF = combinedDF.groupBy("movieId", "title")
.count()
.orderBy($"count".desc)
.limit(1)
val popularMovie = popularDF.select("title").collect().map(_.getString(0)).head
val popularCount = popularDF.select("count").collect().map(_.getLong(0)).head
println(s"Most popular movie: $popularMovie ($popularCount ratings)")
```
完整的代码如下:
```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Movie Ratings Analysis")
.master("local[*]")
.getOrCreate()
val ratingsDF = spark.read.format("csv")
.option("header", "true")
.load("path/to/Ratings.csv")
val moviesDF = spark.read.format("csv")
.option("header", "true")
.load("path/to/movies.csv")
val totalMovies = moviesDF.select("movieId").distinct().count()
println(s"Total number of movies: $totalMovies")
val combinedDF = ratingsDF.join(moviesDF, "movieId")
val avgRatingsDF = combinedDF.groupBy("movieId", "title")
.avg("rating")
.orderBy("movieId")
.withColumnRenamed("avg(rating)", "averageRating")
avgRatingsDF.show(false)
val top10DF = avgRatingsDF.orderBy($"averageRating".desc).limit(10)
val top10Titles = top10DF.select("title").collect().map(_.getString(0))
println(s"Top 10 movies by average rating: ${top10Titles.mkString(", ")}")
val popularDF = combinedDF.groupBy("movieId", "title")
.count()
.orderBy($"count".desc)
.limit(1)
val popularMovie = popularDF.select("title").collect().map(_.getString(0)).head
val popularCount = popularDF.select("count").collect().map(_.getLong(0)).head
println(s"Most popular movie: $popularMovie ($popularCount ratings)")
```
阅读全文