假设有一个名为"employee_info"的HBase表,其中有两个列族:"personal_info"和"work_info"。"personal_info"列族下包含"emp_id"、"name"、"age"、"gender"以及"email"五个列,"work_info"列族下包含"department"、"job_title"、"salary"和"hire_date"四个列。现在需要使用HBase Java API对该表进行以下操作: 1.插入新记录到HBase表中,插入完成之后返回最新行键。 2.获取所有员工的姓名和工资信息。 3.根据员工Id查询其基本信息和高于平均薪资的员工列表。 4.以及获取每个部门的人数和平均薪资。 请使用HBase Java API编写代码实现以上四个需求。
时间: 2023-07-02 20:05:19 浏览: 115
以下是对应的代码实现:
1.插入新记录到HBase表中,插入完成之后返回最新行键。
```java
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseExample {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactory.createConnection();
Table table = connection.getTable(TableName.valueOf("employee_info"));
byte[] rowKey = Bytes.toBytes("row_key");
Put put = new Put(rowKey);
put.addColumn(Bytes.toBytes("personal_info"), Bytes.toBytes("emp_id"), Bytes.toBytes("001"));
put.addColumn(Bytes.toBytes("personal_info"), Bytes.toBytes("name"), Bytes.toBytes("张三"));
put.addColumn(Bytes.toBytes("personal_info"), Bytes.toBytes("age"), Bytes.toBytes("25"));
put.addColumn(Bytes.toBytes("personal_info"), Bytes.toBytes("gender"), Bytes.toBytes("男"));
put.addColumn(Bytes.toBytes("personal_info"), Bytes.toBytes("email"), Bytes.toBytes("zhangsan@qq.com"));
put.addColumn(Bytes.toBytes("work_info"), Bytes.toBytes("department"), Bytes.toBytes("研发部"));
put.addColumn(Bytes.toBytes("work_info"), Bytes.toBytes("job_title"), Bytes.toBytes("软件工程师"));
put.addColumn(Bytes.toBytes("work_info"), Bytes.toBytes("salary"), Bytes.toBytes("10000"));
put.addColumn(Bytes.toBytes("work_info"), Bytes.toBytes("hire_date"), Bytes.toBytes("2020-01-01"));
table.put(put);
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
byte[] rKey = result.getRow();
System.out.println("Row Key:" + Bytes.toString(rKey));
}
scanner.close();
table.close();
connection.close();
}
}
```
2.获取所有员工的姓名和工资信息。
```java
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseExample {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactory.createConnection();
Table table = connection.getTable(TableName.valueOf("employee_info"));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("personal_info"), Bytes.toBytes("name"));
scan.addColumn(Bytes.toBytes("work_info"), Bytes.toBytes("salary"));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell nameCell = result.getColumnLatestCell(Bytes.toBytes("personal_info"), Bytes.toBytes("name"));
Cell salaryCell = result.getColumnLatestCell(Bytes.toBytes("work_info"), Bytes.toBytes("salary"));
String name = Bytes.toString(nameCell.getValueArray(), nameCell.getValueOffset(), nameCell.getValueLength());
String salary = Bytes.toString(salaryCell.getValueArray(), salaryCell.getValueOffset(), salaryCell.getValueLength());
System.out.println("Name:" + name + ", Salary:" + salary);
}
scanner.close();
table.close();
connection.close();
}
}
```
3.根据员工Id查询其基本信息和高于平均薪资的员工列表。
```java
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
public class HBaseExample {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactory.createConnection();
Table table = connection.getTable(TableName.valueOf("employee_info"));
String empId = "001";
Get get = new Get(Bytes.toBytes(empId));
Result result = table.get(get);
Cell nameCell = result.getColumnLatestCell(Bytes.toBytes("personal_info"), Bytes.toBytes("name"));
Cell ageCell = result.getColumnLatestCell(Bytes.toBytes("personal_info"), Bytes.toBytes("age"));
Cell genderCell = result.getColumnLatestCell(Bytes.toBytes("personal_info"), Bytes.toBytes("gender"));
Cell emailCell = result.getColumnLatestCell(Bytes.toBytes("personal_info"), Bytes.toBytes("email"));
Cell departmentCell = result.getColumnLatestCell(Bytes.toBytes("work_info"), Bytes.toBytes("department"));
Cell jobTitleCell = result.getColumnLatestCell(Bytes.toBytes("work_info"), Bytes.toBytes("job_title"));
Cell salaryCell = result.getColumnLatestCell(Bytes.toBytes("work_info"), Bytes.toBytes("salary"));
Cell hireDateCell = result.getColumnLatestCell(Bytes.toBytes("work_info"), Bytes.toBytes("hire_date"));
String name = Bytes.toString(nameCell.getValueArray(), nameCell.getValueOffset(), nameCell.getValueLength());
String age = Bytes.toString(ageCell.getValueArray(), ageCell.getValueOffset(), ageCell.getValueLength());
String gender = Bytes.toString(genderCell.getValueArray(), genderCell.getValueOffset(), genderCell.getValueLength());
String email = Bytes.toString(emailCell.getValueArray(), emailCell.getValueOffset(), emailCell.getValueLength());
String department = Bytes.toString(departmentCell.getValueArray(), departmentCell.getValueOffset(), departmentCell.getValueLength());
String jobTitle = Bytes.toString(jobTitleCell.getValueArray(), jobTitleCell.getValueOffset(), jobTitleCell.getValueLength());
String salary = Bytes.toString(salaryCell.getValueArray(), salaryCell.getValueOffset(), salaryCell.getValueLength());
String hireDate = Bytes.toString(hireDateCell.getValueArray(), hireDateCell.getValueOffset(), hireDateCell.getValueLength());
System.out.println("Name:" + name);
System.out.println("Age:" + age);
System.out.println("Gender:" + gender);
System.out.println("Email:" + email);
System.out.println("Department:" + department);
System.out.println("Job Title:" + jobTitle);
System.out.println("Salary:" + salary);
System.out.println("Hire Date:" + hireDate);
Filter filter = new SingleColumnValueFilter(Bytes.toBytes("work_info"), Bytes.toBytes("salary"), CompareFilter.CompareOp.GREATER, new SubstringComparator(salary));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
List<String> empList = new ArrayList<>();
for (Result r : scanner) {
Cell empIdCell = CellUtil.getCell(r, Bytes.toBytes("personal_info"), Bytes.toBytes("emp_id"));
String empIdValue = Bytes.toString(empIdCell.getValueArray(), empIdCell.getValueOffset(), empIdCell.getValueLength());
empList.add(empIdValue);
}
scanner.close();
System.out.println("Employees with salary higher than " + salary + ":");
for (String emp : empList) {
System.out.println(emp);
}
table.close();
connection.close();
}
}
```
4.以及获取每个部门的人数和平均薪资。
```java
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.AggregateFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.HashMap;
import java.util.Map;
public class HBaseExample {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactory.createConnection();
Table table = connection.getTable(TableName.valueOf("employee_info"));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("work_info"), Bytes.toBytes("department"));
scan.addColumn(Bytes.toBytes("work_info"), Bytes.toBytes("salary"));
ResultScanner scanner = table.getScanner(scan);
Map<String, Integer> empCountMap = new HashMap<>();
Map<String, Double> empSalaryMap = new HashMap<>();
for (Result result : scanner) {
Cell departmentCell = result.getColumnLatestCell(Bytes.toBytes("work_info"), Bytes.toBytes("department"));
Cell salaryCell = result.getColumnLatestCell(Bytes.toBytes("work_info"), Bytes.toBytes("salary"));
String department = Bytes.toString(departmentCell.getValueArray(), departmentCell.getValueOffset(), departmentCell.getValueLength());
String salary = Bytes.toString(salaryCell.getValueArray(), salaryCell.getValueOffset(), salaryCell.getValueLength());
if (empCountMap.containsKey(department)) {
empCountMap.put(department, empCountMap.get(department) + 1);
} else {
empCountMap.put(department, 1);
}
if (empSalaryMap.containsKey(department)) {
empSalaryMap.put(department, empSalaryMap.get(department) + Double.parseDouble(salary));
} else {
empSalaryMap.put(department, Double.parseDouble(salary));
}
}
scanner.close();
for (Map.Entry<String, Integer> entry : empCountMap.entrySet()) {
String department = entry.getKey();
int empCount = entry.getValue();
double empSalary = empSalaryMap.get(department);
double avgSalary = empSalary / empCount;
System.out.println("Department:" + department + ", Employee Count:" + empCount + ", Average Salary:" + avgSalary);
}
table.close();
connection.close();
}
}
```
阅读全文