编写 Scala 工程代码,将 ods 库中表 table1、table2、table3、table4、 table5、table6、table7、table8、table9、table10、table11 抽取到 Hive 的 dwd 库中对应表中。表中有涉及到 timestamp 类型的,均要求按照 yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加 00:00:00,添加之后使其符合 yyyy-MM-dd HH:mm:ss。 1、 抽取 ods 库中表 table1 最新分区数据,并结合 dim_table1 最新分区现有的 数据,根据 id 合并数据到 dwd 库中 dim_table1 的分区表,分区字段为 etl_date 且值与 ods 库的相对应表该值相等,并添加 dwd_insert_user、 dwd_insert_time 、 dwd_modify_user 、 dwd_modify_time 四 列 , 其 中 dwd_insert_user、dwd_modify_user 均填写“user1”。若该条记录第一次 进入数仓 dwd 层则 dwd_insert_time、dwd_modify_time 均存当前操作时间, 并进行数据类型转换。若该数据在进入 dwd 层时发生了合并修改,则 dwd_insert_time 时间不变,dwd_modify_time 存当前操作时间,其余列存 最新的值

时间: 2023-06-14 15:06:21 浏览: 108
下面是 Scala 工程代码的实现: ```scala import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import java.sql.Timestamp object OdsToDwd { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("OdsToDwd") .enableHiveSupport() .getOrCreate() val odsTables = List( "table1", "table2", "table3", "table4", "table5", "table6", "table7", "table8", "table9", "table10", "table11" ) val dimTable1 = "dim_table1" val etlDate = "2022-01-01" // 设置 etl_date 的值 // 读取 dim_table1 最新分区的数据 val dimTable1Latest = spark.table(s"dwd.$dimTable1").where("etl_date = (select max(etl_date) from dwd.dim_table1)") // 遍历 odsTables,分别抽取数据到对应的 dwd 表中 odsTables.foreach(table => { val odsTable = s"ods.$table" val dwdTable = s"dwd.$table" // 读取 ods 表最新分区的数据 val odsTableLatest = spark.table(odsTable).where("dt = (select max(dt) from ods.table1)") // 合并 dim_table1 的最新分区数据和 ods 表最新分区数据 val mergedData = dimTable1Latest.join(odsTableLatest, Seq("id"), "outer") // 计算 dwd_insert_time 和 dwd_modify_time val currentTimestamp = new Timestamp(System.currentTimeMillis()) val dwdInsertTime = when(col("etl_date").isNull, currentTimestamp).otherwise(col("dwd_insert_time")) val dwdModifyTime = when(col("etl_date").isNull, currentTimestamp).otherwise(currentTimestamp) // 添加 dwd_insert_user、dwd_modify_user、dwd_insert_time、dwd_modify_time 列 val result = mergedData.select( col("id"), coalesce(col("dim_col1"), col("ods_col1")).alias("dwd_col1"), coalesce(col("dim_col2"), col("ods_col2")).alias("dwd_col2"), col("dim_col3"), col("ods_col3"), lit(etlDate).alias("etl_date"), lit("user1").alias("dwd_insert_user"), dwdInsertTime.alias("dwd_insert_time"), lit("user1").alias("dwd_modify_user"), dwdModifyTime.alias("dwd_modify_time") ) // 写入 dwd 表的最新分区 result.write.mode("append").partitionBy("etl_date").insertInto(dwdTable) }) spark.stop() } } ``` 这里使用了 Spark SQL 的 DataFrame API 来实现数据抽取和合并,并使用了一系列函数来进行数据类型转换和列名重命名。最后,使用 `DataFrame.write` 将数据写入到 Hive 表中。

相关推荐

最新推荐

recommend-type

数据库实验.py

数据库实验.py
recommend-type

机器学习技术对心电图 (ECG) 信号进行分类matlab代码.zip

1.版本:matlab2014/2019a/2021a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
recommend-type

zigbee-cluster-library-specification

最新的zigbee-cluster-library-specification说明文档。
recommend-type

管理建模和仿真的文件

管理Boualem Benatallah引用此版本:布阿利姆·贝纳塔拉。管理建模和仿真。约瑟夫-傅立叶大学-格勒诺布尔第一大学,1996年。法语。NNT:电话:00345357HAL ID:电话:00345357https://theses.hal.science/tel-003453572008年12月9日提交HAL是一个多学科的开放存取档案馆,用于存放和传播科学研究论文,无论它们是否被公开。论文可以来自法国或国外的教学和研究机构,也可以来自公共或私人研究中心。L’archive ouverte pluridisciplinaire
recommend-type

实现实时数据湖架构:Kafka与Hive集成

![实现实时数据湖架构:Kafka与Hive集成](https://img-blog.csdnimg.cn/img_convert/10eb2e6972b3b6086286fc64c0b3ee41.jpeg) # 1. 实时数据湖架构概述** 实时数据湖是一种现代数据管理架构,它允许企业以低延迟的方式收集、存储和处理大量数据。与传统数据仓库不同,实时数据湖不依赖于预先定义的模式,而是采用灵活的架构,可以处理各种数据类型和格式。这种架构为企业提供了以下优势: - **实时洞察:**实时数据湖允许企业访问最新的数据,从而做出更明智的决策。 - **数据民主化:**实时数据湖使各种利益相关者都可
recommend-type

SQL怎么实现 数据透视表

SQL可以通过使用聚合函数和GROUP BY子句来实现数据透视表。 例如,假设有一个销售记录表,其中包含产品名称、销售日期、销售数量和销售额等信息。要创建一个按照产品名称、销售日期和销售额进行汇总的数据透视表,可以使用以下SQL语句: ``` SELECT ProductName, SaleDate, SUM(SaleQuantity) AS TotalQuantity, SUM(SaleAmount) AS TotalAmount FROM Sales GROUP BY ProductName, SaleDate; ``` 该语句将Sales表按照ProductName和SaleDat
recommend-type

JSBSim Reference Manual

JSBSim参考手册,其中包含JSBSim简介,JSBSim配置文件xml的编写语法,编程手册以及一些应用实例等。其中有部分内容还没有写完,估计有生之年很难看到完整版了,但是内容还是很有参考价值的。
recommend-type

"互动学习:行动中的多样性与论文攻读经历"

多样性她- 事实上SCI NCES你的时间表ECOLEDO C Tora SC和NCESPOUR l’Ingén学习互动,互动学习以行动为中心的强化学习学会互动,互动学习,以行动为中心的强化学习计算机科学博士论文于2021年9月28日在Villeneuve d'Asq公开支持马修·瑟林评审团主席法布里斯·勒菲弗尔阿维尼翁大学教授论文指导奥利维尔·皮耶昆谷歌研究教授:智囊团论文联合主任菲利普·普雷教授,大学。里尔/CRISTAL/因里亚报告员奥利维耶·西格德索邦大学报告员卢多维奇·德诺耶教授,Facebook /索邦大学审查员越南圣迈IMT Atlantic高级讲师邀请弗洛里安·斯特鲁布博士,Deepmind对于那些及时看到自己错误的人...3谢谢你首先,我要感谢我的两位博士生导师Olivier和Philippe。奥利维尔,"站在巨人的肩膀上"这句话对你来说完全有意义了。从科学上讲,你知道在这篇论文的(许多)错误中,你是我可以依
recommend-type

实现实时监控告警系统:Kafka与Grafana整合

![实现实时监控告警系统:Kafka与Grafana整合](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X2pwZy9BVldpY3ladXVDbEZpY1pLWmw2bUVaWXFUcEdLT1VDdkxRSmQxZXB5R1lxaWNlUjA2c0hFek5Qc3FyRktudFF1VDMxQVl3QTRXV2lhSWFRMEFRc0I1cW1ZOGcvNjQw?x-oss-process=image/format,png) # 1.1 Kafka集群架构 Kafka集群由多个称为代理的服务器组成,这
recommend-type

使用vue3+elementsplus封装一个提示确认框的组件,要求将请求地址和确认框展示信息作为参数暴露出去

好的,下面是使用 Vue3 和 ElementPlus 封装一个提示确认框的组件,同时将请求地址和确认框展示信息作为参数暴露出去。 ```vue <template> <el-dialog title="确认提示" :visible.sync="dialogVisible" width="30%" :before-close="handleClose" > <p>{{ message }}</p> <span slot="footer" class="dialog-footer"> <el-button @click="di