编写 Scala 工程代码,将 ods 库中表 table1、table2、table3、table4、 table5、table6、table7、table8、table9、table10、table11 抽取到 Hive 的 dwd 库中对应表中。表中有涉及到 timestamp 类型的,均要求按照 yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加 00:00:00,添加之后使其符合 yyyy-MM-dd HH:mm:ss。 1、 抽取 ods 库中表 table1 最新分区数据,并结合 dim_table1 最新分区现有的 数据,根据 id 合并数据到 dwd 库中 dim_table1 的分区表,分区字段为 etl_date 且值与 ods 库的相对应表该值相等,并添加 dwd_insert_user、 dwd_insert_time 、 dwd_modify_user 、 dwd_modify_time 四 列 , 其 中 dwd_insert_user、dwd_modify_user 均填写“user1”。若该条记录第一次 进入数仓 dwd 层则 dwd_insert_time、dwd_modify_time 均存当前操作时间, 并进行数据类型转换。若该数据在进入 dwd 层时发生了合并修改,则 dwd_insert_time 时间不变,dwd_modify_time 存当前操作时间,其余列存 最新的值
时间: 2023-06-14 19:06:21 浏览: 194
下面是 Scala 工程代码的实现:
```scala
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import java.sql.Timestamp
object OdsToDwd {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("OdsToDwd")
.enableHiveSupport()
.getOrCreate()
val odsTables = List(
"table1",
"table2",
"table3",
"table4",
"table5",
"table6",
"table7",
"table8",
"table9",
"table10",
"table11"
)
val dimTable1 = "dim_table1"
val etlDate = "2022-01-01" // 设置 etl_date 的值
// 读取 dim_table1 最新分区的数据
val dimTable1Latest = spark.table(s"dwd.$dimTable1").where("etl_date = (select max(etl_date) from dwd.dim_table1)")
// 遍历 odsTables,分别抽取数据到对应的 dwd 表中
odsTables.foreach(table => {
val odsTable = s"ods.$table"
val dwdTable = s"dwd.$table"
// 读取 ods 表最新分区的数据
val odsTableLatest = spark.table(odsTable).where("dt = (select max(dt) from ods.table1)")
// 合并 dim_table1 的最新分区数据和 ods 表最新分区数据
val mergedData = dimTable1Latest.join(odsTableLatest, Seq("id"), "outer")
// 计算 dwd_insert_time 和 dwd_modify_time
val currentTimestamp = new Timestamp(System.currentTimeMillis())
val dwdInsertTime = when(col("etl_date").isNull, currentTimestamp).otherwise(col("dwd_insert_time"))
val dwdModifyTime = when(col("etl_date").isNull, currentTimestamp).otherwise(currentTimestamp)
// 添加 dwd_insert_user、dwd_modify_user、dwd_insert_time、dwd_modify_time 列
val result = mergedData.select(
col("id"),
coalesce(col("dim_col1"), col("ods_col1")).alias("dwd_col1"),
coalesce(col("dim_col2"), col("ods_col2")).alias("dwd_col2"),
col("dim_col3"),
col("ods_col3"),
lit(etlDate).alias("etl_date"),
lit("user1").alias("dwd_insert_user"),
dwdInsertTime.alias("dwd_insert_time"),
lit("user1").alias("dwd_modify_user"),
dwdModifyTime.alias("dwd_modify_time")
)
// 写入 dwd 表的最新分区
result.write.mode("append").partitionBy("etl_date").insertInto(dwdTable)
})
spark.stop()
}
}
```
这里使用了 Spark SQL 的 DataFrame API 来实现数据抽取和合并,并使用了一系列函数来进行数据类型转换和列名重命名。最后,使用 `DataFrame.write` 将数据写入到 Hive 表中。
阅读全文