scala spark 将 dwd 库中表的数据关联到 dws 库中 table12 的分区表,分区字段为 etl_date,类型为 String,且值为当前比赛日的前一天日期(分区字段格式 为 yyyyMMdd),并添加 dws_insert_user、dws_insert_time、dws_modify_user、 dws_modify_time 四列,其中 dws_insert_user、dws_modify_user 均填写 “user1”,dws_insert_time、dws_modify_time 均填写当前操作时间(年 月日必须是今天,时分秒只需在比赛时间范围内即可),并进行数据类型转 换
时间: 2023-06-13 10:07:24 浏览: 261
假设我们的dwd库中有一个表叫做dwd_table,我们需要将其关联到dws库中的table12分区表中,可以按照下面的步骤进行操作:
1. 创建dws库中的table12分区表,并定义分区字段etl_date,创建四个额外的列dws_insert_user、dws_insert_time、dws_modify_user、dws_modify_time:
```scala
spark.sql("CREATE TABLE dws.table12 (col1 String, col2 Int, etl_date String) PARTITIONED BY (etl_date String) " +
"TBLPROPERTIES('parquet.compress'='SNAPPY', 'parquet.compression.codec'='snappy')")
spark.sql("ALTER TABLE dws.table12 ADD COLUMNS (dws_insert_user String, dws_insert_time Timestamp, " +
"dws_modify_user String, dws_modify_time Timestamp)")
```
2. 获取比赛日的前一天日期,并将其转换为字符串格式:
```scala
import java.time.LocalDate
import java.time.format.DateTimeFormatter
val dtFormatter = DateTimeFormatter.ofPattern("yyyyMMdd")
val etl_date = dtFormatter.format(LocalDate.now().minusDays(1))
```
3. 将dwd_table中的数据关联到dws库中的table12分区表中,分区字段为etl_date,同时添加四个额外的列:
```scala
val insert_user = "user1"
val modify_user = "user1"
val insert_time = java.sql.Timestamp.valueOf(java.time.LocalDateTime.now())
val modify_time = java.sql.Timestamp.valueOf(java.time.LocalDateTime.now())
spark.sql(s"INSERT INTO dws.table12 PARTITION(etl_date='$etl_date')" +
s"SELECT col1, col2, '$etl_date', '$insert_user', '$insert_time', '$modify_user', '$modify_time' " +
"FROM dwd.dwd_table")
```
4. 对添加的四个列进行数据类型转换:
```scala
spark.sql("ALTER TABLE dws.table12 " +
"CHANGE COLUMN dws_insert_time dws_insert_time Timestamp " +
"CHANGE COLUMN dws_modify_time dws_modify_time Timestamp")
```
这样,我们就完成了将dwd库中表的数据关联到dws库中table12的分区表中,并添加四个额外列的操作。
阅读全文