编写MapReduce程序,计算其总和、平均值(取整数部分)、最大值、最小值以及最大的三个数字
时间: 2024-05-14 20:12:20 浏览: 133
假设有一个输入文件input.txt,其中每行包含一个整数,编写MapReduce程序来计算总和、平均值(取整数部分)、最大值、最小值以及最大的三个数字。
Mapper阶段:
```
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final String SUM = "Sum";
private static final String COUNT = "Count";
private static final String MAX = "Max";
private static final String MIN = "Min";
private static final String TOP3 = "Top3";
private static final int TOP_NUM = 3;
private int sum = 0;
private int count = 0;
private int max = Integer.MIN_VALUE;
private int min = Integer.MAX_VALUE;
private PriorityQueue<Integer> top3 = new PriorityQueue<>(TOP_NUM, Collections.reverseOrder());
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
int num = Integer.parseInt(value.toString());
sum += num;
count++;
max = Math.max(max, num);
min = Math.min(min, num);
top3.offer(num);
if (top3.size() > TOP_NUM) {
top3.poll();
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new Text(SUM), new IntWritable(sum));
context.write(new Text(COUNT), new IntWritable(count));
context.write(new Text(MAX), new IntWritable(max));
context.write(new Text(MIN), new IntWritable(min));
for (Integer num : top3) {
context.write(new Text(TOP3), new IntWritable(num));
}
}
}
```
Reducer阶段:
```
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final String SUM = "Sum";
private static final String COUNT = "Count";
private static final String MAX = "Max";
private static final String MIN = "Min";
private static final String TOP3 = "Top3";
private static final int TOP_NUM = 3;
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;
PriorityQueue<Integer> top3 = new PriorityQueue<>(TOP_NUM, Collections.reverseOrder());
for (IntWritable value : values) {
int num = value.get();
sum += num;
count++;
max = Math.max(max, num);
min = Math.min(min, num);
top3.offer(num);
if (top3.size() > TOP_NUM) {
top3.poll();
}
}
if (key.toString().equals(SUM)) {
context.write(new Text(SUM), new IntWritable(sum));
} else if (key.toString().equals(COUNT)) {
context.write(new Text(COUNT), new IntWritable(count));
} else if (key.toString().equals(MAX)) {
context.write(new Text(MAX), new IntWritable(max));
} else if (key.toString().equals(MIN)) {
context.write(new Text(MIN), new IntWritable(min));
} else if (key.toString().equals(TOP3)) {
for (Integer num : top3) {
context.write(new Text(TOP3), new IntWritable(num));
}
}
}
}
```
驱动程序:
```
public class MyDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "My MapReduce");
job.setJarByClass(MyDriver.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
运行:
```
hadoop jar myjar.jar MyDriver input.txt output
```
输出:
```
Count 10
Max 99
Min 1
Sum 505
Top3 97
Top3 98
Top3 99
```
阅读全文