scala spark sql 抽取 ods 库中表 table9 最新分区的数据,并结合 HBase 中 table9_offline 表中的数据合并抽取到 dwd 库中 table9 的分区表,分区字段为 etl_date 且 值 与 ods 库 的 相 对 应 表 该 值 相 等 , 并 添 加 dwd_insert_user 、 dwd_insert_time 、 dwd_modify_user 、 dwd_modify_time 四 列 , 其 中 dwd_insert_user、dwd_modify_user 均填写“user1”,dwd_insert_time、 dwd_modify_time 均填写当前操作时间(年月日必须是今天,时分秒只需在 比赛时间范围内即可),抽取 HBase 中的数据时,只抽取 XXXX 年 XX 月 XX 日的数据(以 rowkey 为准),并进行数据类型转换。
时间: 2023-06-13 09:06:38 浏览: 95
以下是 Scala Spark SQL 的代码实现:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.time.LocalDate
import java.time.format.DateTimeFormatter
object Table9Merge {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("Table9Merge").getOrCreate()
val odsTable = "ods.table9"
val hbaseTable = "hbase.table9_offline"
// 获取最新分区的数据
val latestPartition = spark.sql(s"SHOW PARTITIONS $odsTable").collect().last.getString(0)
val odsDf = spark.table(odsTable).filter(col("etl_date") === latestPartition.split("=").last)
// 获取需要抽取的日期
val extractDate = LocalDate.of(XXXX, XX, XX)
// 从 HBase 中抽取数据
val hbaseDf = spark.read.format("org.apache.hadoop.hbase.spark")
.option("hbase.table", hbaseTable)
.option("hbase.mapreduce.inputtable", hbaseTable)
.option("hbase.mapreduce.scan.row.start", extractDate.format(DateTimeFormatter.ofPattern("yyyyMMdd")))
.option("hbase.mapreduce.scan.row.stop", extractDate.plusDays(1).format(DateTimeFormatter.ofPattern("yyyyMMdd")))
.load()
// 合并抽取到的数据
val dwdDf = odsDf.join(hbaseDf, Seq("id")).select(
odsDf("id"),
odsDf("name"),
odsDf("age"),
odsDf("etl_date"),
hbaseDf("offline_date"),
hbaseDf("offline_time")
)
// 添加四列数据
val user = "user1"
val now = LocalDate.now().atStartOfDay()
val dwdDfWithColumns = dwdDf.withColumn("dwd_insert_user", lit(user))
.withColumn("dwd_insert_time", lit(now))
.withColumn("dwd_modify_user", lit(user))
.withColumn("dwd_modify_time", lit(now))
// 写入 DWD 库中的表
dwdDfWithColumns.write.partitionBy("etl_date").insertInto("dwd.table9")
spark.stop()
}
}
```
需要替换的地方:
- 将 `ods.table9` 和 `hbase.table9_offline` 替换为实际的表名。
- 将 `XXXX`、`XX`、`XX` 替换为需要抽取的日期。