下面给出北京(bj)、青岛(qd)、昆明(km)、大连(wl)、郑州(hn)、武汉(wh)2015年至2017年的空气质量数据。 请用map/reduce统计各年份各城市空气质量情况。例如2015年北京空气质量结果如下图:用Java的MapReduce
时间: 2024-09-21 17:11:17 浏览: 37
在Java中使用MapReduce处理大量数据,特别是像空气质量数据这种结构化的历史记录,通常需要将数据集分为两个阶段:Map阶段和Reduce阶段。
首先,我们需要定义Mapper函数,它接收的是每条原始的空气质量记录(包含城市名、年份和空气质量指标),然后对每个年份的城市进行分组,并计算该城市的空气质量总值或者平均值。Mapper会将每条记录转化为键值对,键可能是`(城市名, 年份)`,值则是空气质量指标的值。
```java
// Mapper示例
public void map(String cityYearStr, String airQualityData, Context context) {
String[] parts = cityYearStr.split("-");
String cityName = parts[0];
int year = Integer.parseInt(parts[1]);
String[] data = airQualityData.split(",");
double quality = Double.parseDouble(data[0]); // 假设第一个元素是空气质量指标
context.write(new Text(cityName + "_" + year), new IntWritable(quality)); // 输出键值对,键是城市年份,值是质量数值
}
```
接着,在Reducer阶段,我们收集来自Mapper的所有相关数据,对每个城市每年的数据进行累加或求平均,生成最终的结果。Reducer接收到的是城市名和所有对应年份空气质量值的集合。
```java
// Reducer示例
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
String[] citiesAndYears = key.toString().split("_");
String cityName = citiesAndYears[0];
int year = Integer.parseInt(citiesAndYears[1]);
int sumQuality = 0;
for (IntWritable value : values) {
sumQuality += value.get();
}
context.write(new Text(cityName + "," + year), new IntWritable(sumQuality)); // 或者DoubleWritable计算平均值
}
```
最后,MapReduce框架会自动处理并排序数据,生成各年份各城市空气质量汇总报告。
阅读全文