java编写话费统计的MR程序,计算每个电话号码的总流量,包括上行流量,下行流量。
时间: 2024-04-04 09:29:51 浏览: 66
以下是一个简单的Java MapReduce程序,用于统计每个电话号码的总流量,包括上行流量和下行流量:
```java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CallRecordMR {
public static class CallRecordMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text phoneNum = new Text();
private Text flow = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, ",");
String num = tokenizer.nextToken();
String upFlowStr = tokenizer.nextToken();
String downFlowStr = tokenizer.nextToken();
int upFlow = Integer.parseInt(upFlowStr);
int downFlow = Integer.parseInt(downFlowStr);
int totalFlow = upFlow + downFlow;
phoneNum.set(num);
flow.set(String.valueOf(totalFlow));
context.write(phoneNum, flow);
}
}
public static class CallRecordReducer extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable totalFlow = new IntWritable();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
totalFlow.set(sum);
context.write(key, totalFlow);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Call Record MR");
job.setJarByClass(CallRecordMR.class);
job.setMapperClass(CallRecordMapper.class);
job.setReducerClass(CallRecordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
假设输入文件的格式如下:
```
13912345678,100,200
13987654321,50,150
13912345678,150,100
13987654321,100,50
```
其中每一行表示一个电话号码、上行流量和下行流量,用逗号分隔。程序的输出会统计每个电话号码的总流量,结果如下:
```
13912345678 350
13987654321 350
```
注意,这个程序中使用的是Text类型作为map输出的key和value类型,因此在Mapper中需要将电话号码和总流量都转换成String类型,并使用Text类型进行封装。在Reducer中需要对所有的总流量进行累加,最终输出的也是Text类型。如果需要输出Int类型,需要在Reducer中对结果进行转换。
阅读全文