Hadoop MapReduce复制HBase数据实战

0 下载量 154 浏览量 更新于2024-08-29 收藏 610KB PDF 举报
"该资源是一个关于Hadoop与Hbase整合的案例教程,主要目标是通过MapReduce程序将Hbase中的'user'表数据复制到'user2'表中,重点关注f1:name和f1:age这两列的数据迁移。" 在这个案例中,我们首先需要了解Hadoop和Hbase的基本概念。Hadoop是一个开源的分布式计算框架,它允许在大规模集群上处理和存储大量数据。Hbase则是基于Hadoop的分布式非关系型数据库,适用于大数据存储,特别适合实时查询。 步骤一,创建Hbase表: 在Hbase中,我们需要创建两表——'user'和'user2',两者都需要有相同的列族'f1'。列族是Hbase中存储数据的主要结构,列族内部可以包含任意数量的列。以下是如何创建表和插入数据的示例: ```java // 创建'user'表 create 'user', 'f1' // 插入数据 put 'user', 'rk001', 'f1:name', 'tony' put 'user', 'rk001', 'f1:age', '12' put 'user', 'rk001', 'f1:address', 'beijing' put 'user', 'rk002', 'f1:name', 'wangwu' ``` 然后,创建'user2'表,同样列族为'f1'。 步骤二,配置Maven工程并引入依赖: 为了开发MapReduce程序,我们需要创建一个Maven项目,并在pom.xml文件中添加必要的依赖,包括Hadoop和Hbase的相关库。这些库包括Hadoop的MapReduce客户端、核心库、HDFS以及Hbase的客户端、公共库和服务库等。 ```xml <dependencies> <!-- Hadoop相关 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!-- 其他相关依赖 --> <!-- ... --> <!-- Hbase相关 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.1</version> </dependency> <!-- ... --> </dependencies> ``` 步骤三,编写MapReduce程序: MapReduce程序包含两个主要部分:`Mapper`和`Reducer`。在这个案例中,我们可以仅使用`Mapper`,因为我们的任务是简单的数据复制,无需进行数据聚合。 1. 自定义`Mapper`类: `Mapper`负责从输入数据中提取键值对,并生成中间键值对。在Hbase的背景下,我们可以从Hbase表中读取数据,然后将这些数据作为新的键值对输出,以便`Reducer`处理。 ```java import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; public class UserToUser2Mapper extends TableMapper<LongWritable, Text> { // 初始化Hbase连接 protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(context.getConfiguration()); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("user")); // ... 使用table对象读取数据 } // 重写map方法,从user表中读取f1:name和f1:age protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 提取name和age String name = value.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")).toString(); int age = Integer.parseInt(value.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age")).toString()); // 创建新键值对,写入到用户user2表 context.write(new LongWritable(), new Text(name + "," + age)); } } ``` 2. 配置和运行MapReduce作业: 接下来,你需要配置MapReduce作业,指定输入和输出路径,并设置`Mapper`类。然后,提交作业到Hadoop集群执行。 ```java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class UserToUser2Job { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "User to User2"); job.setJarByClass(UserToUser2Job.class); job.setMapperClass(UserToUser2Mapper.class); // 设置输入输出格式 job.setInputFormatClass(TableInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); // 设置输入输出表 TableMapReduceUtil.initTableJob("user", null, job, null, null); TableMapReduceUtil.initTableOutputFormat(job, "user2"); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } ``` 当MapReduce作业完成时,Hbase的'user2'表将包含与'user'表相同的f1:name和f1:age数据,实现了数据的复制。 这个案例展示了如何结合Hadoop MapReduce和Hbase来处理大数据操作,特别是数据迁移。对于更复杂的数据处理任务,可以进一步扩展`Mapper`和`Reducer`的功能,利用Hadoop和Hbase的强大能力。

org.apache.hadoop.hbase.DoNotRetryIOException: Unable to load configured region split policy 'org.apache.phoenix.schema.MetaDataSplitPolicy' for table 'SYSTEM.CATALOG' Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks at org.apache.hadoop.hbase.util.TableDescriptorChecker.warnOrThrowExceptionForFailure(TableDescriptorChecker.java:296) at org.apache.hadoop.hbase.util.TableDescriptorChecker.sanityCheck(TableDescriptorChecker.java:109) at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:2025) at org.apache.hadoop.hbase.master.MasterRpcServices.createTable(MasterRpcServices.java:657) at org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:413) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:133) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:338) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:318) org.apache.hadoop.hbase.DoNotRetryIOException: Unable to load configured region split policy 'org.apache.phoenix.schema.MetaDataSplitPolicy' for table 'SYSTEM.CATALOG' Set hbase.table.sanity.checks to false at conf or table descriptor if you want to bypass sanity checks at org.apache.hadoop.hbase.util.TableDescriptorChecker.warnOrThrowExceptionForFailure(TableDescriptorChecker.java:296) at org.apache.hadoop.hbase.util.TableDescriptorChecker.sanityCheck(TableDescriptorChecker.java:109) at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:2025) at org.apache.hadoop.hbase.master.MasterRpcServices.createTable(MasterRpcServices.java:657) at org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:413) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:133) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:338) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:318)

2023-07-14 上传

23/07/23 16:19:48 ERROR AsyncProcess: Failed to get region location org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.util.ByteStringer at org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:241) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:214) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:364) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:338) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:137) at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.util.ByteStringer at org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:1041) at org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:492) at org.apache.hadoop.hbase.client.ClientSmallReversedScanner$SmallReversedScannerCallable.call(ClientSmallReversedScanner.java:291) at org.apache.hadoop.hbase.client.ClientSmallReversedScanner$SmallReversedScannerCallable.call(ClientSmallReversedScanner.java:276) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:212) ... 7 more

2023-07-24 上传