用Java代码在WordCount基础上进行修改,能够完成以下的任务。 根据给定的输入文件,统计每个人的薪资总额。 能够在伪分布式集群和IDE环境下,实现功能。
时间: 2024-02-11 13:04:05 浏览: 25
好的,以下是Java代码实现:
```java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalaryCount {
public static class SalaryMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text name = new Text();
private IntWritable salary = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
name.set(tokenizer.nextToken()); // 人名作为键
salary.set(Integer.parseInt(tokenizer.nextToken())); // 薪资作为值
context.write(name, salary);
}
}
public static class SalaryReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result); // 输出每个人的薪资总额
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "salary count");
job.setJarByClass(SalaryCount.class);
job.setMapperClass(SalaryMapper.class);
job.setCombinerClass(SalaryReducer.class);
job.setReducerClass(SalaryReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
在这个程序中,我们定义了一个SalaryMapper类和一个SalaryReducer类,分别实现了Map和Reduce函数。在Map函数中,我们将每一行数据中的人名和薪资提取出来,并将其作为键值对进行输出;在Reduce函数中,我们统计每个人的薪资总额,并将结果输出。
我们通过在main函数中设置job的相关参数来启动MapReduce作业并执行任务。在伪分布式集群和IDE环境下,我们可以通过运行该程序来实现对每个人薪资总额的统计。