将学科作为行键,分数作为列族。求每个学科的平均分,mapreduce方法,用Hbase读入的表,给出代码
时间: 2024-02-27 11:54:55 浏览: 145
好的,以下是一个基于Java实现的MapReduce作业,用于计算HBase表中每个学科的平均分。代码中使用了HBase提供的API进行表的读取和写入操作,需要注意的是,在运行代码之前,需要先安装Hadoop和HBase,并启动Hadoop和HBase服务。
```java
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseAverageScore extends Configured implements Tool {
public static class Map extends Mapper<ImmutableBytesWritable, Result, Text, DoubleWritable> {
private Text subject = new Text();
private DoubleWritable score = new DoubleWritable();
public void map(ImmutableBytesWritable row, Result value, Context context)
throws IOException, InterruptedException {
// 获取行键
String rowKey = Bytes.toString(row.get());
// 获取学科
String[] fields = rowKey.split(":");
String sub = fields[0];
this.subject.set(sub);
// 获取分数
Cell cell = value.getColumnLatestCell(Bytes.toBytes("score"), Bytes.toBytes("total"));
double sc = Bytes.toDouble(CellUtil.cloneValue(cell));
this.score.set(sc);
context.write(this.subject, this.score);
}
}
public static class Reduce extends Reducer<Text, DoubleWritable, ImmutableBytesWritable, Put> {
public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0.0;
int count = 0;
// 计算总分数和学生人数
for (DoubleWritable val : values) {
sum += val.get();
count++;
}
// 计算平均分
double average = sum / count;
// 将结果存储到HBase表中
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("score"), Bytes.toBytes("average"), Bytes.toBytes(average));
context.write(null, put);
}
}
public int run(String[] args) throws Exception {
String tableName = "scores";
String columnFamily = "score";
String outputDir = "/output";
Configuration conf = HBaseConfiguration.create();
// 配置HBase表名和列族名
conf.set(TableInputFormat.INPUT_TABLE, tableName);
conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, columnFamily);
// 配置MapReduce作业
Job job = Job.getInstance(conf);
job.setJarByClass(HBaseAverageScore.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
// 配置输出目录
FileOutputFormat.setOutputPath(job, new Path(outputDir));
// 配置输出表名和列族名
TableOutputFormat.setOutputTableName(job, tableName);
TableOutputFormat.setFamily(job, columnFamily.getBytes());
// 运行MapReduce作业
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new HBaseAverageScore(), args);
System.exit(exitCode);
}
}
```
在以上代码中,我们首先定义了一个Mapper,用于从HBase表中读取每一行数据,并将学科和分数作为Mapper的输出key-value对。然后,定义了一个Reducer,用于计算每个学科的平均分并将结果存储到HBase表中。最后,我们在main函数中配置了MapReduce作业的输入输出参数,并运行MapReduce作业。
阅读全文