mapreduce根据提示,在右侧编辑器补充代码,计算出每个手机号码的一年总流量。 main 方法已给出,其中 Job 和输入输出路径已配置完成,无需更改; map 和 reduce 的输入输出 key、value 已给出; 编程中直接写 map 与 reduce 过程的主要内容即可。获取输入的行,并以 , 分开成列表。 将key为手机号,value为当月的总流量传入至reduce中
时间: 2024-02-25 21:58:34 浏览: 95
```java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phoneNum = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
context.write(new Text(phoneNum), new FlowBean(upFlow, downFlow));
}
}
public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlowSum = 0;
long downFlowSum = 0;
for (FlowBean flowBean : values) {
upFlowSum += flowBean.getUpFlow();
downFlowSum += flowBean.getDownFlow();
}
FlowBean resultBean = new FlowBean(upFlowSum, downFlowSum);
context.write(key, resultBean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCount.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
```
阅读全文