使用MapReduce程序统计每年入职的人数。 最终结果要求如下: 1. 格式如下: 年份:1980 人数:xxx 年份:1981 人数:xxx ....... 2. 两个分区: 0分区存储 入职年份<1982年的 1分区存储 入职年份>=1982年的
时间: 2023-11-22 10:53:01 浏览: 188
MapReduceV1:JobTracker处理Heartbeat流程分析
下面是一个使用MapReduce程序统计每年入职人数的示例代码:
Mapper阶段:
```java
public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
private IntWritable outputKey = new IntWritable();
private final static IntWritable ONE = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
int year = Integer.parseInt(fields[2].split("-")[0]);
outputKey.set(year);
context.write(outputKey, ONE);
}
}
```
Reducer阶段:
```java
public class EmployeeReducer extends Reducer<IntWritable, IntWritable, Text, IntWritable> {
private Text outputKey = new Text();
private IntWritable outputValue = new IntWritable();
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outputKey.set("年份:" + key.get());
outputValue.set(sum);
context.write(outputKey, outputValue);
}
}
```
Driver阶段:
```java
public class EmployeeCounter {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Employee Counter");
job.setJarByClass(EmployeeCounter.class);
job.setMapperClass(EmployeeMapper.class);
job.setReducerClass(EmployeeReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
```
在Driver中,我们可以通过设置不同的输入路径和输出路径来实现两个分区的需求:
```java
FileInputFormat.addInputPath(job, new Path(args[0] + "/0")); // 0分区的输入路径
FileInputFormat.addInputPath(job, new Path(args[0] + "/1")); // 1分区的输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/0")); // 0分区的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/1")); // 1分区的输出路径
```
阅读全文