用Java编写代码:通过MapReduce对2月 商品A 325 1月 商品A 235 1月 商品B 234 2月 商品C 572 3月 商品A 123 1月 商品C 321 2月 商品B 573 3月 商品C 876进行排序,要求先按月份升序排序,再按销售量降序排序
时间: 2024-01-07 10:06:04 浏览: 89
数据排序MapReduce实例
假设输入数据已经存储在Hadoop分布式文件系统中的一个文件中,每行为一个销售记录,格式为“月份 商品名称 销售量”,如下所示:
1月 商品A 235
1月 商品B 0
1月 商品C 0
2月 商品A 325
2月 商品B 573
2月 商品C 321
3月 商品A 123
3月 商品B 573
3月 商品C 876
下面是Java编写的MapReduce程序,实现对销售记录的排序:
```
import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalesSorter {
public static class SalesMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text monthAndProduct = new Text();
private IntWritable sales = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(" ");
String month = tokens[0];
String product = tokens[1];
int salesAmount = Integer.parseInt(tokens[2]);
monthAndProduct.set(month + " " + product);
sales.set(salesAmount);
context.write(monthAndProduct, sales);
}
}
public static class SalesReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private TreeMap<Text, Integer> salesMap = new TreeMap<Text, Integer>(new SalesComparator());
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
salesMap.put(new Text(key), new Integer(sum));
}
public void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<Text, Integer> entry : salesMap.entrySet()) {
context.write(entry.getKey(), new IntWritable(entry.getValue()));
}
}
}
public static class SalesComparator implements Comparator<Text> {
public int compare(Text o1, Text o2) {
String[] tokens1 = o1.toString().split(" ");
String[] tokens2 = o2.toString().split(" ");
int month1 = Integer.parseInt(tokens1[0]);
int month2 = Integer.parseInt(tokens2[0]);
int sales1 = Integer.parseInt(tokens1[1]);
int sales2 = Integer.parseInt(tokens2[1]);
if (month1 < month2) {
return -1;
} else if (month1 > month2) {
return 1;
} else {
return (sales2 - sales1);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "SalesSorter");
job.setJarByClass(SalesSorter.class);
job.setMapperClass(SalesMapper.class);
job.setReducerClass(SalesReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
程序中定义了一个SalesComparator类,用于按照要求排序。在reduce函数中,将每个月份和商品名称作为Text类型的键,将销售量作为IntWritable类型的值,存储到一个TreeMap中。在cleanup函数中,将TreeMap中的每个键值对输出到输出文件中。在主函数中,设置了输入路径和输出路径,并启动MapReduce作业。
阅读全文