请利用https://www.kaggle.com/datasets/whenamancodes/popular movies-datasets-58000-movies?resource=download 中包含文件:genome-scores.csv, genome-tags.csv, links.csv, movies.csv, ratings.csv and tags的数据,设计数据处理与分析任务(挖掘出有价值的信息,并根据返回结果 简要分析说明),要求分析各类型电影数和平均评分,采用 2 种不同方式完成。可采用的组件有 MapReduce、HBase、 Hive、Spark core 和 Spark SQL 等;(HiveQL 与基于 Hive 的 Spark SQL 不能同时使用) 3、 相关数据文件必须上传到 HDFS 分布式文件系统上存储,数据处理分析结果需要打 印输出到屏幕上并且导出文件保存到 HDFS 上;(输出结果包含非常多条记录时,请只打印 前 10 条,但是导出到文件的记录须是完整的)
时间: 2024-04-02 13:36:12 浏览: 100
首先,我们需要将数据文件上传到 HDFS 上。假设我们将所有文件都上传到了 HDFS 的 `/movies` 目录下。
然后,我们可以采用两种不同的方式来完成各类型电影数和平均评分的分析。
方式一:MapReduce
我们可以采用 MapReduce 进行数据处理和分析。具体步骤如下:
1. 编写 MapReduce 程序,读取 `movies.csv` 和 `ratings.csv` 文件,将数据按照电影类型进行分类,并计算每种类型电影的总数和平均评分。
2. 运行 MapReduce 程序,获得分析结果。
3. 将分析结果导出到 HDFS 上的某个文件中,以便后续使用。
下面是一个简单的示例 MapReduce 程序:
```java
public class MovieTypeCountAndAvgRating {
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private Text movieType = new Text();
private DoubleWritable rating = new DoubleWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
// 过滤掉无效数据
if (fields.length != 3) {
return;
}
String movieId = fields[0];
String ratingValue = fields[2];
// 读取电影类型
Path typeFilePath = new Path("hdfs://localhost:9000/movies/movies.csv");
FileSystem fs = FileSystem.get(context.getConfiguration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(typeFilePath)));
String typeLine = "";
while ((typeLine = br.readLine()) != null) {
String[] typeFields = typeLine.split(",");
if (typeFields.length != 3) {
continue;
}
if (typeFields[0].equals(movieId)) {
movieType.set(typeFields[2]);
break;
}
}
br.close();
rating.set(Double.parseDouble(ratingValue));
context.write(movieType, rating);
}
}
public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (DoubleWritable value : values) {
sum += value.get();
count++;
}
context.write(key, new DoubleWritable(sum / count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "MovieTypeCountAndAvgRating");
job.setJarByClass(MovieTypeCountAndAvgRating.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/movies/ratings.csv"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/movies/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
我们可以通过运行以下命令来提交 MapReduce 作业:
```bash
hadoop jar movie-type-count-and-avg-rating.jar MovieTypeCountAndAvgRating /movies/ratings.csv /movies/output
```
其中,`movie-type-count-and-avg-rating.jar` 是编译后的 MapReduce 程序,`/movies/ratings.csv` 是输入文件路径,`/movies/output` 是输出目录路径。
我们可以通过以下命令来查看输出结果:
```bash
hdfs dfs -cat /movies/output/*
```
方式二:Spark SQL
我们也可以采用 Spark SQL 进行数据处理和分析。具体步骤如下:
1. 在 Spark 中创建 `DataFrame`,读取 `movies.csv` 和 `ratings.csv` 文件的数据。
2. 将 `DataFrame` 转换为临时表,使用 Spark SQL 进行查询,计算每种类型电影的总数和平均评分。
3. 将查询结果导出到 HDFS 上的某个文件中,以便后续使用。
下面是一个简单的示例 Spark SQL 程序:
```scala
import org.apache.spark.sql.SparkSession
object MovieTypeCountAndAvgRating {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("MovieTypeCountAndAvgRating").getOrCreate()
// 读取数据文件
val movies = spark.read.format("csv").option("header", "true").load("hdfs://localhost:9000/movies/movies.csv")
val ratings = spark.read.format("csv").option("header", "true").load("hdfs://localhost:9000/movies/ratings.csv")
// 将 DataFrame 转换为临时表
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")
// 使用 Spark SQL 进行查询
val result = spark.sql("SELECT movies.genres, COUNT(DISTINCT movies.movieId) AS count, AVG(ratings.rating) AS avg_rating FROM movies JOIN ratings ON movies.movieId = ratings.movieId GROUP BY movies.genres ORDER BY count DESC")
// 将查询结果导出到 HDFS 上的某个文件中
result.write.format("csv").option("header", "true").mode("overwrite").save("hdfs://localhost:9000/movies/output")
spark.stop()
}
}
```
我们可以通过运行以下命令来提交 Spark 作业:
```bash
spark-submit --class MovieTypeCountAndAvgRating movie-type-count-and-avg-rating.jar
```
其中,`movie-type-count-and-avg-rating.jar` 是编译后的 Spark SQL 程序。
我们可以通过以下命令来查看输出结果:
```bash
hdfs dfs -cat /movies/output/*
```
无论采用哪种方式进行数据处理和分析,最终得到的结果应该包含每种类型电影的总数和平均评分。我们可以根据这些结果进行进一步的分析,比如找出最受欢迎的电影类型或者评分最高的电影类型等。