spark sql scala 若该条记录第一次 进入数仓 dwd 层则 dwd_insert_time、dwd_modify_time 均存当前操作时间, 并进行数据类型转换。若该数据在进入 dwd 层时发生了合并修改,则 dwd_insert_time 时间不变,dwd_modify_time 存当前操作时间,其余列存 最新的值。
时间: 2023-06-13 12:08:19 浏览: 134
在 Spark SQL Scala 中,你可以使用 DataFrame 的 API 进行该操作。首先,你需要将数据加载到 DataFrame 中,然后根据业务需求对数据进行转换和处理。针对你提到的情况,可以采用以下方式进行处理:
1. 加载数据到 DataFrame 中:
```scala
val df = spark.read.format("csv").load("path/to/csv")
```
2. 判断该条记录是否第一次进入数仓 dwd 层:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val existingCols = df.columns
val newCols = Seq("dwd_insert_time", "dwd_modify_time") ++ existingCols
val schema = StructType(newCols.map(c => StructField(c, StringType, true)))
val existingData = spark.read.table("dwd_table")
val newData = df
.join(existingData, existingData("id") === df("id"), "left_anti")
.withColumn("dwd_insert_time", current_timestamp())
.withColumn("dwd_modify_time", current_timestamp())
.select(newCols.head, newCols.tail: _*)
.withColumn("dwd_insert_time", col("dwd_insert_time").cast(TimestampType))
.withColumn("dwd_modify_time", col("dwd_modify_time").cast(TimestampType))
```
在上述代码中,我们通过读取已有的 dwd_table 表来判断该条记录是否第一次进入数仓 dwd 层,如果该记录在已有表中不存在,则认为该记录是第一次进入数仓 dwd 层,此时我们需要为新记录的 dwd_insert_time 和 dwd_modify_time 列赋值为当前时间,同时将原有的列名和值拼接到 DataFrame 中。最后,我们将时间类型转换为 TimestampType。
3. 对于已存在的记录,进行合并修改:
```scala
val existingData = spark.read.table("dwd_table")
val newData = df
.join(existingData, existingData("id") === df("id"), "inner")
.withColumn("dwd_modify_time", current_timestamp())
.select(newCols.head, newCols.tail: _*)
.withColumn("dwd_insert_time",
when(col("dwd_insert_time").isNull, current_timestamp()).otherwise(col("dwd_insert_time")))
.withColumn("dwd_modify_time", col("dwd_modify_time").cast(TimestampType))
```
在上述代码中,我们通过读取已有的 dwd_table 表来判断该记录是否已经存在于数仓 dwd 层,如果该记录已经存在,则认为该记录发生了合并修改,此时我们需要保留原有的 dwd_insert_time 值,并将 dwd_modify_time 赋值为当前时间。同时,我们需要将原有的列名和值拼接到 DataFrame 中,并将时间类型转换为 TimestampType。
4. 将处理后的数据保存到 dwd_table 表中:
```scala
newData.write.mode(SaveMode.Append).saveAsTable("dwd_table")
```
在上述代码中,我们将处理后的数据保存到 dwd_table 表中,使用 Append 模式进行追加操作。
完整的代码如下所示:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types._
val existingCols = df.columns
val newCols = Seq("dwd_insert_time", "dwd_modify_time") ++ existingCols
val schema = StructType(newCols.map(c => StructField(c, StringType, true)))
val existingData = spark.read.table("dwd_table")
val newData = df
.join(existingData, existingData("id") === df("id"), "left_anti")
.withColumn("dwd_insert_time", current_timestamp())
.withColumn("dwd_modify_time", current_timestamp())
.select(newCols.head, newCols.tail: _*)
.withColumn("dwd_insert_time", col("dwd_insert_time").cast(TimestampType))
.withColumn("dwd_modify_time", col("dwd_modify_time").cast(TimestampType))
val existingData = spark.read.table("dwd_table")
val newData = df
.join(existingData, existingData("id") === df("id"), "inner")
.withColumn("dwd_modify_time", current_timestamp())
.select(newCols.head, newCols.tail: _*)
.withColumn("dwd_insert_time",
when(col("dwd_insert_time").isNull, current_timestamp()).otherwise(col("dwd_insert_time")))
.withColumn("dwd_modify_time", col("dwd_modify_time").cast(TimestampType))
newData.write.mode(SaveMode.Append).saveAsTable("dwd_table")
```
阅读全文