销售数据如下图所示,其中的字段为:1,产品ID;2,客户ID;3,日期;4、渠道ID;5,促销ID;6,销售的数量(件);7,销售的总额(元)。使用MapReduce求出各年销售笔数,各年销售总额和各年销售平均值,使用Java编程实现
时间: 2024-02-22 21:58:25 浏览: 170
好的,下面是使用 Java 编程实现的代码,注意要按照 MapReduce 模式来编写:
```java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalesAnalysis {
// Mapper 阶段
public static class SalesMapper extends Mapper<Object, Text, Text, SalesDataWritable> {
private Text outKey = new Text();
private SalesDataWritable outValue = new SalesDataWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
int year = getYear(fields[2]);
int productId = Integer.parseInt(fields[0]);
int customerId = Integer.parseInt(fields[1]);
int salesQty = Integer.parseInt(fields[5]);
double salesAmount = Double.parseDouble(fields[6]);
outKey.set(Integer.toString(year));
outValue.set(productId, customerId, salesQty, salesAmount);
context.write(outKey, outValue);
}
private int getYear(String dateString) {
return Integer.parseInt(dateString.substring(0, 4));
}
}
// Reducer 阶段
public static class SalesReducer extends Reducer<Text, SalesDataWritable, Text, SalesDataWritable> {
private SalesDataWritable outValue = new SalesDataWritable();
public void reduce(Text key, Iterable<SalesDataWritable> values, Context context) throws IOException, InterruptedException {
int totalQty = 0;
double totalAmount = 0.0;
int count = 0;
for (SalesDataWritable value : values) {
totalQty += value.getSalesQty();
totalAmount += value.getSalesAmount();
count++;
}
double averageAmount = totalAmount / totalQty;
outValue.set(0, 0, totalQty, totalAmount);
context.write(key, outValue);
outValue.set(0, 0, count, averageAmount);
context.write(key, outValue);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Sales Analysis");
job.setJarByClass(SalesAnalysis.class);
job.setMapperClass(SalesMapper.class);
job.setReducerClass(SalesReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(SalesDataWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
```
上述代码中,`SalesDataWritable` 类是一个自定义的数据类型,用于存储销售数据。具体实现如下:
```java
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class SalesDataWritable implements Writable {
private int productId;
private int customerId;
private int salesQty;
private double salesAmount;
public SalesDataWritable() {
this(0, 0, 0, 0.0);
}
public SalesDataWritable(int productId, int customerId, int salesQty, double salesAmount) {
this.productId = productId;
this.customerId = customerId;
this.salesQty = salesQty;
this.salesAmount = salesAmount;
}
public int getProductId() {
return productId;
}
public int getCustomerId() {
return customerId;
}
public int getSalesQty() {
return salesQty;
}
public double getSalesAmount() {
return salesAmount;
}
public void set(int productId, int customerId, int salesQty, double salesAmount) {
this.productId = productId;
this.customerId = customerId;
this.salesQty = salesQty;
this.salesAmount = salesAmount;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(productId);
out.writeInt(customerId);
out.writeInt(salesQty);
out.writeDouble(salesAmount);
}
@Override
public void readFields(DataInput in) throws IOException {
productId = in.readInt();
customerId = in.readInt();
salesQty = in.readInt();
salesAmount = in.readDouble();
}
@Override
public String toString() {
return String.format("%d,%d,%d,%.2f", productId, customerId, salesQty, salesAmount);
}
}
```
在执行 MapReduce 任务前,需要先将销售数据写入到 HDFS 中,然后执行以下命令:
```
hadoop jar SalesAnalysis.jar /input/sales.txt /output
```
其中,`/input/sales.txt` 是销售数据文件的路径,`/output` 是 MapReduce 输出结果的路径。
阅读全文