scala编写抽取shtd_store库中sku_info的增量数据进入Hive的ods库中表sku_info。根据ods.sku_info表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)数据库用户名为root数据库密码为123456
时间: 2023-05-30 08:02:55 浏览: 138
全国职业技能大赛大数据赛项十套赛题(shtd)
import org.apache.spark.sql.SparkSession
object IncrementalDataExtract {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("IncrementalDataExtract").enableHiveSupport().getOrCreate()
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store"
val username = "root"
val password = "123456"
val startDate = args(0) // 增量抽取的起始日期,格式为yyyyMMdd
val endDate = args(1) // 增量抽取的结束日期,格式为yyyyMMdd
val odsTable = "ods.sku_info"
// 创建静态分区,分区字段为dt,值为当前比赛日的前一天日期
val dt = java.time.LocalDate.now.minusDays(1).toString.replaceAll("-", "")
spark.sql(s"ALTER TABLE $odsTable ADD IF NOT EXISTS PARTITION (dt=$dt)")
// 查询增量数据
val incrementalData = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"(SELECT * FROM sku_info WHERE create_time >= '$startDate' AND create_time <= '$endDate') AS incremental_data")
.option("user", username)
.option("password", password)
.load()
// 将增量数据保存到ods表中
incrementalData.write.mode("append").insertInto(odsTable)
spark.stop()
}
}
阅读全文