用javaAPI来做以下题目:已知数据如下: 7369,SMITH,CLERK,7902,1980-12-17,800,null,20 7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30 7521,WARD,SALESMAN,7698,1981-02-22,1250,500,30 7566,JONES,MANAGER,7839,1981-04-02,2975,null,20 7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30 7698,BLAKE,MANAGER,7839,1981-05-01,2850,null,30 7782,CLARK,MANAGER,7839,1981-06-09,2450,null,10 7788,SCOTT,ANALYST,7566,1987-04-19,3000,null,20 7839,KING,PRESIDENT,null,1981-11-17,5000,null,10 7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30 7876,ADAMS,CLERK,7788,1987-05-23,1100,null,20 7900,JAMES,CLERK,7698,1981-12-03,950,null,30 7902,FORD,ANALYST,7566,1981-12-02,3000,null,20 7934,MILLER,CLERK,7782,1982-01-23,1300,null,10 使用mr程序统计每年入职的人数。 最终结果要求如下: 1. 格式如下: 年份:1980 人数:xxx 年份:1981 人数:xxx ....... 2. 两个分区: 0分区存储 入职年份<1982年的 1分区存储 入职年份>=1982年的
时间: 2023-07-27 13:13:59 浏览: 81
好的,以下是用 Java API 实现的代码:
```java
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeCountByYear {
public static class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
private IntWritable outKey = new IntWritable();
private static final int JOIN_YEAR_INDEX = 4;
private static final int JOIN_YEAR_THRESHOLD = 1982;
@Override
public void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
throws java.io.IOException, InterruptedException {
String[] tokens = value.toString().split(",");
int joinYear = Integer.parseInt(tokens[JOIN_YEAR_INDEX].substring(0, 4));
int partition = joinYear < JOIN_YEAR_THRESHOLD ? 0 : 1;
outKey.set(joinYear);
context.write(outKey, new IntWritable(partition));
}
}
public static class EmployeePartitioner extends Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
return value.get();
}
}
public static class EmployeeReducer extends Reducer<IntWritable, IntWritable, Text, NullWritable> {
@Override
public void reduce(IntWritable key, Iterable<IntWritable> values,
Reducer<IntWritable, IntWritable, Text, NullWritable>.Context context)
throws java.io.IOException, InterruptedException {
int[] counts = new int[2];
Arrays.fill(counts, 0);
for (IntWritable value : values) {
counts[value.get()]++;
}
context.write(new Text("年份:" + key.get() + " 人数:" + counts[0]), NullWritable.get());
context.write(new Text("年份:" + key.get() + " 人数:" + counts[1]), NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Employee Count By Year");
job.setJarByClass(EmployeeCountByYear.class);
job.setMapperClass(EmployeeMapper.class);
job.setReducerClass(EmployeeReducer.class);
job.setPartitionerClass(EmployeePartitioner.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2); // 2 partitions
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
这段代码实现了一个 MapReduce 任务,用于统计每年入职的人数,并将结果按照入职年份和分区进行输出。具体实现如下:
1. Mapper 阶段:读入数据,解析每个员工的入职年份,并根据入职年份是否大于等于 1982 年将数据分为 0 分区和 1 分区。将入职年份作为输出的 key,将分区作为输出的 value,输出格式为 `(入职年份, 分区)`。
2. Partitioner 阶段:根据 Mapper 输出的 value 进行分区,0 分区的数据将被发送到第一个 Reducer,1 分区的数据将被发送到第二个 Reducer。
3. Reducer 阶段:对于每个入职年份,在两个分区中分别统计人数,输出格式为 `"年份:入职年份 人数:xx"`。
在 `main` 函数中,我们设置了两个分区,并指定了 Mapper、Reducer 和 Partitioner 的实现类。还需要指定输入和输出路径,并启动任务。
阅读全文