基于spark电影数据分析代码
时间: 2023-08-09 20:11:33 浏览: 128
以下是一个基于 Spark 的电影数据分析的代码示例:
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 创建 SparkSession
spark = SparkSession.builder.appName("MovieAnalysis").getOrCreate()
# 读取电影数据集
movies_df = spark.read.format("csv").option("header", "true").load("movies.csv")
ratings_df = spark.read.format("csv").option("header", "true").load("ratings.csv")
# 电影评分分析
ratings_count_df = ratings_df.groupBy("movieId").count()
ratings_count_df.show()
# 电影类型分析
genres_df = movies_df.select("genres")
genres_df = genres_df.withColumn("genre", explode(split(col("genres"), "\|")))
genre_count_df = genres_df.groupBy("genre").count()
genre_count_df.show()
# 导演分析
directors_df = movies_df.select("movieId", "directors")
ratings_directors_df = ratings_df.join(directors_df, ratings_df.movieId == directors_df.movieId, "left")
ratings_directors_df = ratings_directors_df.drop(directors_df.movieId)
ratings_directors_df = ratings_directors_df.groupBy("directors").agg(avg(col("rating")).alias("avg_rating"))
ratings_directors_df.show()
# 演员分析
actors_df = movies_df.select("movieId", "cast")
ratings_actors_df = ratings_df.join(actors_df, ratings_df.movieId == actors_df.movieId, "left")
ratings_actors_df = ratings_actors_df.drop(actors_df.movieId)
ratings_actors_df = ratings_actors_df.groupBy("cast").agg(avg(col("rating")).alias("avg_rating"))
ratings_actors_df.show()
# 地区分析
countries_df = movies_df.select("movieId", "countries")
ratings_countries_df = ratings_df.join(countries_df, ratings_df.movieId == countries_df.movieId, "left")
ratings_countries_df = ratings_countries_df.drop(countries_df.movieId)
ratings_countries_df = ratings_countries_df.groupBy("countries").agg(avg(col("rating")).alias("avg_rating"))
ratings_countries_df.show()
# 停止 SparkSession
spark.stop()
```
以上代码读取了两个电影数据集(movies.csv 和 ratings.csv),并使用 Spark 进行了电影评分、电影类型、导演、演员和地区等数据的分析。根据具体的数据集和分析需求,代码可能会有所不同。
阅读全文