通过mapreduce实现实训数据是国家电网监控系统监测到的用户用电情况(每15分钟监测一次),其样例数据结构如表3.1所示,利用大数据技术编写程序实现定期从本地系统上传数据到大数据集群,按照地域对每个月的用电总量进行统计,并且统计各地域每个月的城市用电、农村和山区用电的比例的源码
时间: 2024-03-31 13:38:32 浏览: 57
基于MapReduce云计算的智能电网数据分析方法研究
以下是一个简单的MapReduce程序实现,可以实现上述要求:
Mapper部分:
```java
public class ElectricityMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable electricity = new IntWritable();
private Text area = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 5) {
String month = fields[2].substring(0, 7);
area.set(fields[4]);
electricity.set(Integer.parseInt(fields[3]));
context.write(new Text(area.toString() + "," + month), electricity);
}
}
}
```
Reducer部分:
```java
public class ElectricityReducer extends Reducer<Text, IntWritable, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
int cityElectricity = 0;
int countryElectricity = 0;
int mountainElectricity = 0;
for (IntWritable val : values) {
sum += val.get();
String areaType = key.toString().split(",")[0];
switch (areaType) {
case "城市":
cityElectricity += val.get();
break;
case "农村":
countryElectricity += val.get();
break;
case "山区":
mountainElectricity += val.get();
break;
default:
break;
}
}
String[] fields = key.toString().split(",");
String month = fields[1];
String area = fields[0];
double cityPercentage = (double) cityElectricity / sum;
double countryPercentage = (double) countryElectricity / sum;
double mountainPercentage = (double) mountainElectricity / sum;
result.set("总用电量:" + sum + ",城市用电比例:" + cityPercentage + ",农村用电比例:" + countryPercentage + ",山区用电比例:" + mountainPercentage);
context.write(new Text(area + "," + month), result);
}
}
```
Driver部分:
```java
public class ElectricityDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "electricity analysis");
job.setJarByClass(ElectricityDriver.class);
job.setMapperClass(ElectricityMapper.class);
job.setReducerClass(ElectricityReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
可以将上述代码保存为Java文件,通过Hadoop环境编译和运行即可实现上述需求。其中,需要将输入文件和输出路径作为参数传递给程序。
阅读全文