用spark分析文件说明 Ratings.csv文件 文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp userId: 每个用户的id movieId: 每部电影的id rating: 用户评分,是5星制 timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.csv文件 movieId: 每部电影的Id title:每部电影的名字 题目说明 (1) 统计电影总数 (2) 合并两个文件,计算每部电影的平均得分是多少(说明:显示结果为movieId,title,平均得分) (3) 统计得分排名前10的电影(输出电影名称) (4) 统计最受欢迎的电影(即评分人数最多的电影), 思路:4.1对Ratings.csv文件操作,统计每部电影的评分人数 4.2对评分人数进行排序 4.3在movies.csv文件中过滤出评分人数最多的电影名称 (5)编程实现RDD转DataFrame,并选择一个合理的SQL分析。
时间: 2024-03-02 11:49:02 浏览: 25
1. 统计电影总数
使用Spark的RDD API读取movies.csv文件,然后通过count()方法统计电影总数。代码如下:
```python
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Movie Analysis")
sc = SparkContext(conf=conf)
movies_rdd = sc.textFile("movies.csv")
movies_count = movies_rdd.count()
print("Total number of movies:", movies_count)
```
2. 合并两个文件,计算每部电影的平均得分是多少
使用Spark的RDD API读取Ratings.csv文件,然后通过map()方法将每条记录转换为(movieId, (rating, 1))的键值对形式,其中第一个元素为电影ID,第二个元素为元组,包含评分值和1,表示一个用户对该电影进行了一次评分。接着使用reduceByKey()方法将同一个电影ID的所有评分记录合并,并计算出该电影的总评分和评分人数。最后使用join()方法将该结果与movies.csv文件中的电影名称进行连接,得到每部电影的平均得分。代码如下:
```python
ratings_rdd = sc.textFile("ratings.csv")
movie_ratings_rdd = ratings_rdd.map(lambda x: (x.split(",")[1], (float(x.split(",")[2]), 1))) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.mapValues(lambda x: x[0]/x[1])
movies_rdd = sc.textFile("movies.csv")
movie_info_rdd = movies_rdd.map(lambda x: (x.split(",")[0], x.split(",")[1]))
movie_avg_ratings_rdd = movie_ratings_rdd.join(movie_info_rdd) \
.map(lambda x: (x[0], x[1][1], x[1][0]))
print("Movies with average rating:")
for movie in movie_avg_ratings_rdd.collect():
print(movie)
```
3. 统计得分排名前10的电影
对上一步得到的结果按照评分排序,并取前10个电影。代码如下:
```python
top_10_movies = movie_avg_ratings_rdd.sortBy(lambda x: x[2], ascending=False).take(10)
print("Top 10 movies by rating:")
for movie in top_10_movies:
print(movie[1])
```
4. 统计最受欢迎的电影
4.1 对Ratings.csv文件操作,统计每部电影的评分人数
使用与第2步相同的方法,得到每部电影的评分人数。代码如下:
```python
ratings_count_rdd = ratings_rdd.map(lambda x: (x.split(",")[1], 1)) \
.reduceByKey(lambda x, y: x+y)
```
4.2 对评分人数进行排序
对评分人数进行排序,取出评分人数最多的电影ID。代码如下:
```python
most_popular_movie_id = ratings_count_rdd.sortBy(lambda x: x[1], ascending=False).first()[0]
```
4.3 在movies.csv文件中过滤出评分人数最多的电影名称
使用filter()方法过滤出电影ID等于most_popular_movie_id的电影,并得到其电影名称。代码如下:
```python
most_popular_movie_title = movies_rdd.filter(lambda x: x.split(",")[0] == most_popular_movie_id) \
.map(lambda x: x.split(",")[1]).first()
print("Most popular movie:", most_popular_movie_title)
```
5. 编程实现RDD转DataFrame,并选择一个合理的SQL分析。
将上述结果转换为DataFrame,并使用Spark SQL分析。代码如下:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Movie Analysis").getOrCreate()
ratings_count_df = ratings_count_rdd.toDF(["movie_id", "ratings_count"])
movie_avg_ratings_df = movie_avg_ratings_rdd.toDF(["movie_id", "movie_title", "avg_rating"])
ratings_count_df.createOrReplaceTempView("ratings_count")
movie_avg_ratings_df.createOrReplaceTempView("movie_avg_ratings")
most_popular_movie_df = spark.sql("SELECT movie_title FROM movie_avg_ratings WHERE movie_id = (SELECT movie_id FROM ratings_count ORDER BY ratings_count DESC LIMIT 1)")
print("Most popular movie (using Spark SQL):")
most_popular_movie_df.show()
```
以上代码中,首先使用toDF()方法将RDD转换为DataFrame,并命名列名。然后使用createOrReplaceTempView()方法将DataFrame注册为临时表,方便后续使用Spark SQL进行查询。最后使用Spark SQL语句查询评分人数最多的电影名称。