spark 读取hive数据 写入hbase
时间: 2023-04-30 12:00:36 浏览: 163
spark访问hbase
Spark可以通过HiveContext来读取Hive数据,然后通过HBase API将数据写入HBase。具体步骤如下:
1. 创建SparkConf和SparkContext对象。
2. 创建HiveContext对象,通过HiveContext对象读取Hive数据。
3. 将Hive数据转换为RDD。
4. 创建HBaseConfiguration对象,设置HBase相关配置。
5. 创建HBase表,如果表已经存在,则不需要创建。
6. 将RDD中的数据写入HBase表中。
示例代码如下:
```
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
object SparkHiveToHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkHiveToHBase")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
// 读取Hive数据
val df = hiveContext.sql("SELECT * FROM my_table")
// 将DataFrame转换为RDD
val rdd = df.rdd
// 创建HBaseConfiguration对象
val hbaseConf = HBaseConfiguration.create()
// 设置HBase相关配置
hbaseConf.set("hbase.zookeeper.quorum", "localhost")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
// 创建HBase表
val tableName = TableName.valueOf("my_table")
val connection = ConnectionFactory.createConnection(hbaseConf)
val admin = connection.getAdmin
if (!admin.tableExists(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("cf".getBytes))
admin.createTable(tableDesc)
}
// 将RDD中的数据写入HBase表中
rdd.foreachPartition(partition => {
val connection = ConnectionFactory.createConnection(hbaseConf)
val table = connection.getTable(tableName)
partition.foreach(row => {
val put = new Put(Bytes.toBytes(row.getString()))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes(row.getString(1)))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col2"), Bytes.toBytes(row.getString(2)))
table.put(put)
})
table.close()
connection.close()
})
sc.stop()
}
}
```
阅读全文