用虚拟机中的spark进行分析。文件说明 Ratings.csv文件, 文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp 。userId: 每个用户的id ,movieId: 每部电影的id, rating: 用户评分,是5星制 ,timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.csv文件 movieId: 每部电影的Id title:每部电影的名字 题目说明 (1) 统计电影总数(去重) (2) 合并两个文件,计算每部电影的平均得分是多少(说明:显示结果为movieId,title,平均得分) (3) 统计得分(平均分)排名前10的电影(输出电影名称) (4) 统计最受欢迎的电影(即评分人数最多的电影),
时间: 2024-03-02 15:49:06 浏览: 193
好的,我们可以使用Spark进行数据分析。首先,我们需要启动Spark和创建SparkSession:
```python
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder.appName('movie_analysis').getOrCreate()
```
然后,我们可以读取Ratings.csv和movies.csv文件并创建DataFrame:
```python
# 读取Ratings.csv文件
ratings_df = spark.read.csv("Ratings.csv", header=True, inferSchema=True)
# 读取movies.csv文件
movies_df = spark.read.csv("movies.csv", header=True, inferSchema=True)
```
接下来,我们可以统计电影总数并输出结果:
```python
# 统计电影总数
total_movies = movies_df.select("movieId").distinct().count()
print("电影总数为:", total_movies)
```
然后,我们可以使用join操作合并两个DataFrame,并计算每部电影的平均得分:
```python
# 合并两个DataFrame
merged_df = ratings_df.join(movies_df, on="movieId", how="inner")
# 计算每部电影的平均得分
mean_ratings = merged_df.groupBy("movieId", "title") \
.agg({"rating": "mean"}) \
.withColumnRenamed("avg(rating)", "average_rating")
# 显示结果
mean_ratings.show()
```
接下来,我们可以统计得分(平均分)排名前10的电影:
```python
# 统计得分(平均分)排名前10的电影
top10_ratings = mean_ratings.orderBy("average_rating", ascending=False).limit(10)
# 显示结果
top10_ratings.show(truncate=False)
```
最后,我们可以统计最受欢迎的电影,即评分人数最多的电影,并输出电影名称和评分人数:
```python
# 统计最受欢迎的电影
popular_movies = merged_df.groupBy("movieId", "title") \
.agg({"userId": "count"}) \
.withColumnRenamed("count(userId)", "user_count")
# 找到评分人数最多的电影
most_popular_movie = popular_movies.orderBy("user_count", ascending=False).first()
# 显示结果
print("最受欢迎的电影是:", most_popular_movie.title, ",评分人数为:", most_popular_movie.user_count)
```
完整代码如下:
```python
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder.appName('movie_analysis').getOrCreate()
# 读取Ratings.csv文件
ratings_df = spark.read.csv("Ratings.csv", header=True, inferSchema=True)
# 读取movies.csv文件
movies_df = spark.read.csv("movies.csv", header=True, inferSchema=True)
# 统计电影总数
total_movies = movies_df.select("movieId").distinct().count()
print("电影总数为:", total_movies)
# 合并两个DataFrame
merged_df = ratings_df.join(movies_df, on="movieId", how="inner")
# 计算每部电影的平均得分
mean_ratings = merged_df.groupBy("movieId", "title") \
.agg({"rating": "mean"}) \
.withColumnRenamed("avg(rating)", "average_rating")
# 显示结果
mean_ratings.show()
# 统计得分(平均分)排名前10的电影
top10_ratings = mean_ratings.orderBy("average_rating", ascending=False).limit(10)
# 显示结果
top10_ratings.show(truncate=False)
# 统计最受欢迎的电影
popular_movies = merged_df.groupBy("movieId", "title") \
.agg({"userId": "count"}) \
.withColumnRenamed("count(userId)", "user_count")
# 找到评分人数最多的电影
most_popular_movie = popular_movies.orderBy("user_count", ascending=False).first()
# 显示结果
print("最受欢迎的电影是:", most_popular_movie.title, ",评分人数为:", most_popular_movie.user_count)
```
注意:在使用Spark时,需要确保电脑已经安装了Spark并且设置好了环境变量。
阅读全文