使用Scala编写spark工程代码,将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中。 抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。根据ods.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段类型为String,且值为当前2023年4月16的前一天日期(分区字段格式为yyyyMMdd)数据库用户名为root密码为123456
时间: 2023-05-25 16:06:16 浏览: 140
以下是Scala代码实现:
```scala
import org.apache.spark.sql.SparkSession
import java.sql.DriverManager
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
object IncrementalDataLoader {
def main(args: Array[String]): Unit = {
//初始化SparkSession
val spark = SparkSession.builder().appName("IncrementalDataLoader").enableHiveSupport().getOrCreate()
//获取MySQL中的新数据
val mysqlDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_store")
.option("dbtable", "user_info")
.option("user", "root")
.option("password", "123456")
.option("driver", "com.mysql.jdbc.Driver")
.load()
//获取ods库中user_info表中的最大操作时间
val max_operate_time = spark.sql("select max(operate_time) from ods.user_info")
.first()
.getTimestamp(0)
val max_create_time = spark.sql("select max(create_time) from ods.user_info")
.first()
.getTimestamp(0)
//筛选出新数据
val newDF = mysqlDF.filter(row => {
val operate_time = row.getTimestamp(0)
val create_time = row.getTimestamp(1)
operate_time.after(max_operate_time) || create_time.after(max_create_time)
})
//获取当前时间及前一天的日期
val time_formatter = DateTimeFormatter.ofPattern("yyyyMMdd")
val date = LocalDateTime.now().minusDays(1).format(time_formatter)
//将新增数据添加静态分区
newDF.write.format("hive")
.option("hive.exec.dynamic.partition.mode", "nonstrict")
.option("hive.exec.dynamic.partition", "true")
.option("hive.partition.mode", "nonstrict")
.option("hive.mapred.supports.subdirectories", "true")
.option("hive.exec.max.dynamic.partitions", "2048")
.option("hive.exec.max.dynamic.partitions.pernode", "256")
.option("partition_cols", "date")
.insertInto("ods.user_info")
}
}
```
说明:
1. 首先,我们从MySQL中读取增量数据,并使用SparkSession初始化一个Spark应用程序。
2. 然后,我们查询ods库中user_info表的最大操作时间和创建时间。这可以帮助我们确定新增数据的最新时间。
3. 下一步,我们使用过滤器筛选出新增的数据。
4. 接下来,我们获取当前时间及前一天的日期,用于添加静态分区。
5. 最后,我们将新增数据写入ods库中user_info表,并为分区添加静态分区。
注意:
1. 要将代码中的MySQL连接URL、用户名、密码修改为你自己的MySQL连接URL、用户名和密码。
2. 确保已经创建了ods.user_info表,并且分区字段名为date,类型为String。
阅读全文