使用java运用hadoop统计文档phone_data(3)手机号耗费的总上行流量、下行流量、总流量(序列化)
时间: 2024-03-16 17:44:53 浏览: 69
hadoop流量统计程序
首先,需要在Hadoop中准备好phone_data(3)文件,可以使用HDFS命令将文件上传到HDFS中。然后,可以使用Java编写MapReduce程序来统计手机号的上行流量、下行流量和总流量,并将结果序列化输出。
下面是示例代码:
Mapper代码:
```
public class PhoneDataMapper extends Mapper<LongWritable, Text, Text, PhoneDataWritable> {
private PhoneDataWritable phoneDataWritable = new PhoneDataWritable();
private Text phoneNum = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length == 7) {
phoneNum.set(fields[1]);
phoneDataWritable.setUpFlow(Long.parseLong(fields[4]));
phoneDataWritable.setDownFlow(Long.parseLong(fields[5]));
phoneDataWritable.setTotalFlow(Long.parseLong(fields[6]));
context.write(phoneNum, phoneDataWritable);
}
}
}
```
Reducer代码:
```
public class PhoneDataReducer extends Reducer<Text, PhoneDataWritable, Text, PhoneDataWritable> {
private PhoneDataWritable phoneDataWritable = new PhoneDataWritable();
@Override
protected void reduce(Text key, Iterable<PhoneDataWritable> values, Context context) throws IOException, InterruptedException {
long upFlow = 0;
long downFlow = 0;
long totalFlow = 0;
for (PhoneDataWritable value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
totalFlow += value.getTotalFlow();
}
phoneDataWritable.setUpFlow(upFlow);
phoneDataWritable.setDownFlow(downFlow);
phoneDataWritable.setTotalFlow(totalFlow);
context.write(key, phoneDataWritable);
}
}
```
PhoneDataWritable代码:
```
public class PhoneDataWritable implements Writable {
private long upFlow;
private long downFlow;
private long totalFlow;
public PhoneDataWritable() {
}
public PhoneDataWritable(long upFlow, long downFlow, long totalFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.totalFlow = totalFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getTotalFlow() {
return totalFlow;
}
public void setTotalFlow(long totalFlow) {
this.totalFlow = totalFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(totalFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
totalFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + totalFlow;
}
}
```
Driver代码:
```
public class PhoneDataDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "PhoneData");
job.setJarByClass(PhoneDataDriver.class);
job.setMapperClass(PhoneDataMapper.class);
job.setReducerClass(PhoneDataReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneDataWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
然后在命令行中运行以下命令即可启动MapReduce作业:
```
hadoop jar PhoneData.jar PhoneDataDriver phone_data(3) output
```
其中,PhoneData.jar是打包好的MapReduce程序的jar包,phone_data(3)是输入文件路径,output是输出文件路径。
最后,在output目录中可以找到序列化后的结果文件。
阅读全文