spark读取hbase使用TableInputFormat的方式 读取列为info:date 值为2023/07/01 的数据
时间: 2024-04-21 07:22:15 浏览: 18
要使用Spark读取HBase中的数据,可以使用TableInputFormat的方式。下面是一个示例代码:
```scala
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
object SparkReadHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkReadHBase").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "your-zookeeper-quorum")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "your-zookeeper-port")
hbaseConf.set(TableInputFormat.INPUT_TABLE, "your-hbase-table-name")
val scan = new Scan()
scan.addColumn("info".getBytes, "date".getBytes)
scan.setCaching(500)
hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val filteredRDD = hbaseRDD.filter{ case (_, result) =>
val value = Bytes.toString(result.getValue("info".getBytes, "date".getBytes))
value == "2023/07/01"
}
filteredRDD.foreach{ case (_, result) =>
val value = Bytes.toString(result.getValue("info".getBytes, "date".getBytes))
println(value)
}
sc.stop()
}
def convertScanToString(scan: Scan): String = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
}
```
请注意,你需要替换代码中的以下部分:
1. `your-zookeeper-quorum`:将其替换为你的Zookeeper的地址。
2. `your-zookeeper-port`:将其替换为你的Zookeeper的端口。
3. `your-hbase-table-name`:将其替换为你要读取的HBase表的名称。
这段代码会读取HBase表中列族为 "info",列名为 "date",值为 "2023/07/01" 的数据,并打印出来。你可以根据自己的需求进行修改和调整。