请利用https://www.kaggle.com/datasets/whenamancodes/popular movies-datasets-58000-movies?resource=download 中包含文件:genome-tags.csv, links.csv, movies.csv, ratings.csv and tags.csv的数据,要求编程分析不同体裁电影口碑情况,分析哪种电影体裁口碑比较好,采用的组件有 MapReduce、HBase、 Hive、Spark core 和 Spark SQL 等;(HiveQL 与基于 Hive 的 Spark SQL 不能同时使用) 3、 相关数据文件必须上传到 HDFS 分布式文件系统上存储,数据处理分析结果需要打 印输出到屏幕上并且导出文件保存到 HDFS 上;(输出结果包含非常多条记录时,请只打印 前 10 条,但是导出到文件的记录须是完整的)
时间: 2024-04-02 20:34:34 浏览: 104
好的,我会根据你的要求进行分析。首先,我需要将数据文件上传到HDFS分布式文件系统上,并进行相关的数据处理和分析。
以下是基于Spark SQL的分析过程:
1. 加载数据文件
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("movie_analysis").getOrCreate()
movies = spark.read.csv("hdfs://<HDFS_master>:<port>/movies.csv", header=True, inferSchema=True)
ratings = spark.read.csv("hdfs://<HDFS_master>:<port>/ratings.csv", header=True, inferSchema=True)
tags = spark.read.csv("hdfs://<HDFS_master>:<port>/tags.csv", header=True, inferSchema=True)
```
2. 处理数据
```python
from pyspark.sql.functions import explode, split, udf
from pyspark.sql.types import IntegerType
# 将genres列按照'|'分割并拆分为多行
movies = movies.withColumn("genre", explode(split("genres", "\|")))
# 根据movieId合并ratings和tags数据
ratings_tags = ratings.join(tags, ["userId", "movieId"], "outer")
# 选择需要的列
ratings_tags = ratings_tags.select("movieId", "tag", "rating")
# 将tag列中的内容全部转换为小写字母
ratings_tags = ratings_tags.withColumn("tag", udf(lambda x: x.lower(), StringType())("tag"))
# 将rating列数据类型转换为整数
ratings_tags = ratings_tags.withColumn("rating", ratings_tags["rating"].cast(IntegerType()))
```
3. 分析数据
```python
from pyspark.sql.functions import avg
# 按照genre分组并计算平均评分和平均标签数
genre_ratings_tags = movies.join(ratings_tags, "movieId") \
.groupBy("genre") \
.agg(avg("rating").alias("avg_rating"), avg("tag").alias("avg_tag"))
# 按照平均评分排序并显示前10条数据
genre_ratings_tags.orderBy("avg_rating", ascending=False).show(10)
# 按照平均标签数排序并显示前10条数据
genre_ratings_tags.orderBy("avg_tag", ascending=False).show(10)
```
4. 将结果导出到文件
```python
# 将结果保存到HDFS上
genre_ratings_tags.write.csv("hdfs://<HDFS_master>:<port>/genre_ratings_tags.csv")
```
以上是基于Spark SQL的分析过程。你可以根据需要选择其他组件进行分析。
阅读全文