用java编写话费统计的MR程序,计算每个电话号码的总流量,包括上行流量,下行流量的代码
时间: 2024-03-21 14:39:12 浏览: 170
好的,这里提供一份Java编写话费统计的MR程序示例代码:
```java
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class PhoneFlowCount {
public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, FlowBean> {
private Text phoneNum = new Text();
private FlowBean flowBean = new FlowBean();
public void map(Text key, Text value, OutputCollector<Text, FlowBean> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] fields = line.split("\t");
phoneNum.set(fields[0]);
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
flowBean.set(upFlow, downFlow);
output.collect(phoneNum, flowBean);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean result = new FlowBean();
public void reduce(Text key, Iterator<FlowBean> values, OutputCollector<Text, FlowBean> output,
Reporter reporter) throws IOException {
long upFlow = 0;
long downFlow = 0;
while (values.hasNext()) {
FlowBean bean = values.next();
upFlow += bean.getUpFlow();
downFlow += bean.getDownFlow();
}
result.set(upFlow, downFlow);
output.collect(key, result);
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(PhoneFlowCount.class);
conf.setJobName("phone_flow_count");
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
```
在上述代码中,FlowBean是一个自定义的数据类型,用于存储每个手机号码的上下行流量数据。在Map阶段中,我们将输入的每行数据按照制表符进行拆分,然后将手机号码作为键,上下行流量作为值进行输出;在Reduce阶段中,我们对每个手机号码的上下行流量进行累加,最终得到每个手机号码的总流量,并将其作为值进行输出。
阅读全文