scala spark 抽取 db 库中 table4 的增量数据进入 Hive 的 ods 库中表 table4,增量字段 取 ods.table4 表中 time1、time2 中的最大者,只将新增的数据抽入,字段 名称、类型不变,同时添加静态分区,分区字段为 etl_date,类型为 String, 且值为当前比赛日的前一天日期(分区字段格式为 yyyyMMdd)详细代码以及两种方法
时间: 2023-06-14 20:04:43 浏览: 204
方法一:
```scala
import org.apache.spark.sql.functions._
import java.text.SimpleDateFormat
import java.util.Calendar
val dbUrl = "jdbc:mysql://localhost:3306/db"
val dbUser = "user"
val dbPwd = "password"
val hiveDb = "ods"
// 获取当前比赛日的前一天日期
val date = Calendar.getInstance()
date.add(Calendar.DATE, -1)
val etlDate = new SimpleDateFormat("yyyyMMdd").format(date.getTime())
// 初始化 SparkSession
val spark = SparkSession.builder()
.appName("db to hive")
.enableHiveSupport()
.getOrCreate()
// 读取 db 库中 table4 表的最大时间戳
val maxTimeDF = spark.read.format("jdbc")
.option("url", dbUrl)
.option("dbtable", "table4")
.option("user", dbUser)
.option("password", dbPwd)
.load()
.agg(max(col("time1")).as("max_time"))
// 如果表中没有数据,则将最大时间戳设置为 0
val maxTime = maxTimeDF.head().getAs[java.sql.Timestamp]("max_time")
val maxTimeStr = if (maxTime != null) maxTime.toString() else "0"
// 从 db 库中读取增量数据
val df = spark.read.format("jdbc")
.option("url", dbUrl)
.option("dbtable", s"(SELECT * FROM table4 WHERE time1 > '$maxTimeStr') as table4")
.option("user", dbUser)
.option("password", dbPwd)
.load()
// 添加静态分区
val resultDF = df.withColumn("etl_date", lit(etlDate))
// 将数据写入 Hive 的 ods 库中的 table4 表中
resultDF.write.mode("append").partitionBy("etl_date")
.format("hive")
.saveAsTable(s"$hiveDb.table4")
```
方法二:
```scala
import org.apache.spark.sql.functions._
import java.text.SimpleDateFormat
import java.util.Calendar
val dbUrl = "jdbc:mysql://localhost:3306/db"
val dbUser = "user"
val dbPwd = "password"
val hiveDb = "ods"
// 获取当前比赛日的前一天日期
val date = Calendar.getInstance()
date.add(Calendar.DATE, -1)
val etlDate = new SimpleDateFormat("yyyyMMdd").format(date.getTime())
// 初始化 SparkSession
val spark = SparkSession.builder()
.appName("db to hive")
.enableHiveSupport()
.getOrCreate()
// 读取 Hive 的 ods 库中的 table4 表中时间戳最大的记录
val maxTimeDF = spark.sql(s"SELECT MAX(time1) as max_time FROM $hiveDb.table4")
val maxTime = maxTimeDF.head().getAs[java.sql.Timestamp]("max_time")
// 如果表中没有数据,则将最大时间戳设置为 0
val maxTimeStr = if (maxTime != null) maxTime.toString() else "0"
// 从 db 库中读取增量数据
val df = spark.read.format("jdbc")
.option("url", dbUrl)
.option("dbtable", s"(SELECT * FROM table4 WHERE time1 > '$maxTimeStr') as table4")
.option("user", dbUser)
.option("password", dbPwd)
.load()
// 添加静态分区
val resultDF = df.withColumn("etl_date", lit(etlDate))
// 将数据写入 Hive 的 ods 库中的 table4 表中
resultDF.write.mode("append").partitionBy("etl_date")
.format("hive")
.saveAsTable(s"$hiveDb.table4")
```
两种方法的区别在于,方法一通过读取 db 库中的表获取最大时间戳,而方法二通过读取 Hive 的表获取最大时间戳。方法一可能会比方法二更快,因为读取 db 库中的表比读取 Hive 的表更快,但是方法一需要在 Spark 中进行一次聚合操作,而方法二不需要。
阅读全文