解释代码import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class mypreduce extends TableReducer<Text, Text, ImmutableBytesWritable> { @Override protected void reduce(Text k3, Iterable<Text> v3, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { int avg=0; for(Text i:v3){ System.out.println(k3.toString()+","+i.toString()); int sum=0; int count=0; String[] scores=i.toString().split(" "); System.out.println(Arrays.toString(scores)); //逐个将每个学生的成绩分数相加 for(String score:scores){ sum=sum+Integer.parseInt(score); count++; //System.out.println("reduce52"); } System.out.println(sum); //计算每个学生的平均分数 avg=sum/count; System.out.println(avg); } //将计算出的每个学生的平均分数添加到HBase表格中 Put put=new Put(Bytes.toBytes(k3.toString())); put.addColumn(Bytes.toBytes("avg"),null,Bytes.toBytes(String.valueOf(avg))); context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), put); } }
时间: 2024-04-28 12:20:34 浏览: 192
这是一个 Hadoop MapReduce 的 Reduce 阶段的代码,用于将一个学生的多次考试成绩求平均分并写入 HBase 表格中。具体来说,这段代码继承自 HBase 的 TableReducer 类,输入的键值对类型为 Text 和 Text,输出的键值对类型为 ImmutableBytesWritable 和 Mutation。在 reduce() 方法中,首先遍历每个学生的成绩列表,将每个学生的成绩相加并计算平均分,然后将计算出的平均分作为值写入 HBase 表格中。其中,Put 对象表示一行数据,包含了行键、列族、列名和列值等信息。ImmutableBytesWritable 类表示 HBase 表格中的行键,Mutation 是一个抽象类,其子类 Put 表示要写入一行数据。
相关问题
解释代码import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class CourceReduce extends TableReducer<Text, Text, ImmutableBytesWritable> { @Override protected void reduce(Text k3, Iterable<Text> v3, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { int avg=0,avgp=0,avgj=0,avgl=0; int sumc=0;int sump=0;int sumj=0;int suml=0; int sum=0; int count=0; for(Text i:v3){ //获取学生成绩,并将其用空格分割 String[] scores=i.toString().split(" "); //遍历各科目的成绩,计算各科目总分 for(String sc:scores){ sum = sum + Integer.parseInt(sc); count++; } avg=sum/count; } //将平均分信息封装到Put类型数据中,并输出到HBase表中 Put put=new Put(Bytes.toBytes(k3.toString())); put.addColumn(Bytes.toBytes("avg"),null,Bytes.toBytes(String.valueOf(avg))); context.write(new ImmutableBytesWritable(Bytes.toBytes(k3.toString())), put); } }
这段代码实现了Hadoop MapReduce中的Reducer函数,用于计算学生各科目的平均分,并将结果存储到HBase表中。
具体来说,该Reducer函数继承了HBase的TableReducer类,并重写了其中的reduce函数。reduce函数的参数包括key-value对和context对象,其中key表示学生姓名,value表示学生各科目的成绩,context用于输出结果到HBase表中。
在reduce函数中,首先定义了4个变量avg、avgp、avgj、avgl,分别表示语文、数学、英语、理综四科目的平均分。接着定义了4个变量sumc、sump、sumj、suml,分别表示各科目的总分。然后定义了sum、count两个变量,分别表示所有科目的总分和科目数。
接下来遍历value,即学生各科目的成绩,将其用空格分割,然后计算各科目总分,并累加到对应的变量中。同时,还需要累加count变量,用于计算平均分。最后,将平均分信息封装到Put类型数据中,并输出到HBase表中。
需要注意的是,该Reducer函数的输入是<Text, Text>类型,表示学生姓名和各科目的成绩,输出是<ImmutableBytesWritable, Mutation>类型,表示学生姓名和平均分信息。
使用hbase Java api完成以下操作: 1.建表userfollower 2.用户Mary有一个粉丝Alice,Alice关注列表中也存在Mary 3.用户Jack有两个粉丝Bob和Ben 4.Jack还关注了Mary 5.查看Jack被哪些人关注,Jack关注了谁 6.Bob取关了Jack 7.扫描全表数据 导入以下包完成import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.;import org.apache.hadoop.hbase.client.;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;使用Table table = connection.getTable(TableName.valueOf(tableName));创建表
代码如下:
```java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HBaseExample {
private static Configuration conf = null;
private static Connection connection = null;
static {
conf = HBaseConfiguration.create();
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
String tableName = "userfollower";
createTable(tableName);
addUserFollower("Mary", "Alice");
addUserFollower("Jack", "Bob");
addUserFollower("Jack", "Ben");
addUserFollower("Jack", "Mary");
System.out.println("Jack is followed by: " + getFollowers("Jack"));
System.out.println("Jack follows: " + getFollowed("Jack"));
removeUserFollower("Bob", "Jack");
scanTable(tableName);
}
private static void createTable(String tableName) throws IOException {
Admin admin = connection.getAdmin();
TableName table = TableName.valueOf(tableName);
if (!admin.tableExists(table)) {
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table);
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf")).build();
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
admin.createTable(tableDescriptorBuilder.build());
}
}
private static void addUserFollower(String user, String follower) throws IOException {
Table table = connection.getTable(TableName.valueOf("userfollower"));
Put put = new Put(Bytes.toBytes(user));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("follower"), Bytes.toBytes(follower));
table.put(put);
table.close();
}
private static void removeUserFollower(String user, String follower) throws IOException {
Table table = connection.getTable(TableName.valueOf("userfollower"));
Delete delete = new Delete(Bytes.toBytes(user));
delete.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("follower"), Bytes.toBytes(follower));
table.delete(delete);
table.close();
}
private static String getFollowers(String user) throws IOException {
Table table = connection.getTable(TableName.valueOf("userfollower"));
Get get = new Get(Bytes.toBytes(user));
Result result = table.get(get);
byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("follower"));
table.close();
return Bytes.toString(value);
}
private static String getFollowed(String user) throws IOException {
Table table = connection.getTable(TableName.valueOf("userfollower"));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("follower"));
ResultScanner resultScanner = table.getScanner(scan);
StringBuilder sb = new StringBuilder();
for (Result result : resultScanner) {
byte[] row = result.getRow();
byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("follower"));
String follower = Bytes.toString(value);
if (follower.equals(user)) {
sb.append(Bytes.toString(row)).append(" ");
}
}
resultScanner.close();
table.close();
return sb.toString();
}
private static void scanTable(String tableName) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
byte[] row = result.getRow();
byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("follower"));
System.out.println(Bytes.toString(row) + " follows " + Bytes.toString(value));
}
resultScanner.close();
table.close();
}
}
```
需要注意的是,程序运行前需要先启动 HBase 的服务。
阅读全文