MapReduce案例----影评分析(年份,电影id,电影名字,平均评分)
时间: 2023-08-15 20:13:38 浏览: 197
首先,我们需要将影评数据以适合MapReduce处理的格式存储到HDFS中,例如使用CSV格式,并将其上传到HDFS。假设数据集中的每一行包含以下字段:用户ID、电影ID、评分、时间戳。
接下来,我们将使用MapReduce框架编写代码来分析数据并计算每个电影的平均评分。以下是MapReduce程序的伪代码:
Mapper阶段:
1. 读取每一行数据
2. 提取年份、电影ID和评分字段
3. 将每个电影ID和对应的评分作为键值对输出(键是电影ID,值是评分)
Reducer阶段:
1. 接收每个电影ID和相应的评分列表
2. 计算平均评分
3. 将电影ID、年份、电影名字和平均评分作为键值对输出(键是年份,值是电影ID、电影名字和平均评分)
以下是MapReduce程序的详细代码实现:
Mapper代码实现:
```java
public class MovieRatingMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> {
private final static IntWritable movieId = new IntWritable();
private final static FloatWritable rating = new FloatWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length == 4) {
int year = getYearFromTimestamp(Long.parseLong(fields[3]));
movieId.set(Integer.parseInt(fields[1]));
rating.set(Float.parseFloat(fields[2]));
context.write(movieId, rating);
}
}
private int getYearFromTimestamp(long timestamp) {
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(timestamp * 1000L);
return cal.get(Calendar.YEAR);
}
}
```
Reducer代码实现:
```java
public class MovieRatingReducer extends Reducer<IntWritable, FloatWritable, IntWritable, Text> {
private final static Text movieData = new Text();
public void reduce(IntWritable key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
float totalRating = 0;
for (FloatWritable val : values) {
totalRating += val.get();
count++;
}
float averageRating = totalRating / count;
movieData.set(getMovieData(key.get(), averageRating));
context.write(new IntWritable(getYearFromKey(key.get())), movieData);
}
private String getMovieData(int movieId, float averageRating) {
String movieName = getMovieNameFromId(movieId);
return movieId + "," + movieName + "," + averageRating;
}
private String getMovieNameFromId(int movieId) {
// TODO: Implement this method to retrieve movie name from movie ID
return "";
}
private int getYearFromKey(int key) {
// TODO: Implement this method to extract year from key
return 0;
}
}
```
在上面的Reducer代码中,我们还需要实现两个方法。getMovieNameFromId方法将根据电影ID返回电影名称,getYearFromKey方法将从键中提取年份。
最后,我们需要编写一个Driver类来将Mapper和Reducer组合起来,并将结果输出到HDFS中。
Driver代码实现:
```java
public class MovieRatingDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Movie Rating Analysis");
job.setJarByClass(MovieRatingDriver.class);
job.setMapperClass(MovieRatingMapper.class);
job.setReducerClass(MovieRatingReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
最后,我们可以使用以下命令运行程序:
```
hadoop jar movie-rating.jar MovieRatingDriver input output
```
其中,input是输入数据集的HDFS路径,output是输出结果的HDFS路径。运行成功后,我们应该能够在输出路径中找到一个包含电影ID、电影名称、年份和平均评分的文件。
阅读全文