mapreduce根据提示,在右侧编辑器补充代码,计算出每个手机号码的一年总流量。 main 方法已给出,其中 Job 和输入输出路径已配置完成,无需更改; map 和 reduce 的输入输出 key、value 已给出; 编程中直接写 map 与 reduce 过程的主要内容即可。获取输入的行,并以 , 分开成列表。 将当月的上行流量和下行流量相加。将key为手机号,value为当月的总流量传入至reduce中
时间: 2024-02-25 22:58:21 浏览: 70
以下是代码实现:
```java
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MobileFlow {
public static class MobileFlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text phoneNum = new Text();
private FlowBean flow = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
phoneNum.set(fields[0]);
flow.setUpFlow(Long.parseLong(fields[1]));
flow.setDownFlow(Long.parseLong(fields[2]));
flow.setUpCount(Long.parseLong(fields[3]));
flow.setDownCount(Long.parseLong(fields[4]));
context.write(phoneNum, flow);
}
}
public static class MobileFlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean result = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long upFlowTotal = 0;
long downFlowTotal = 0;
long upCountTotal = 0;
long downCountTotal = 0;
for (FlowBean value : values) {
upFlowTotal += value.getUpFlow();
downFlowTotal += value.getDownFlow();
upCountTotal += value.getUpCount();
downCountTotal += value.getDownCount();
}
result.setUpFlow(upFlowTotal);
result.setDownFlow(downFlowTotal);
result.setUpCount(upCountTotal);
result.setDownCount(downCountTotal);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(MobileFlow.class);
job.setMapperClass(MobileFlowMapper.class);
job.setReducerClass(MobileFlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class FlowBean {
private Long upFlow;
private Long downFlow;
private Long upCount;
private Long downCount;
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getUpFlow() {
return upFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setUpCount(Long upCount) {
this.upCount = upCount;
}
public Long getUpCount() {
return upCount;
}
public void setDownCount(Long downCount) {
this.downCount = downCount;
}
public Long getDownCount() {
return downCount;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + upCount + "\t" + downCount;
}
}
}
```
在 Mapper 中,我们将每行数据按逗号分隔后,第一列作为 key,后面四列作为 value(这里我们自定义了一个 FlowBean 类来封装这四列)。
在 Reducer 中,我们将每个手机号的所有数据累加起来,计算出总流量和总通话次数,并将结果输出。
最后在 main 方法中设置 mapper、reducer、输入输出路径等信息后,提交到集群并等待任务完成即可。
阅读全文