使用Scala编写spark工程代码,将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中。
时间: 2023-05-25 10:06:11 浏览: 233
以下为伪代码,实现过程需要根据实际情况进行调整:
```scala
import org.apache.spark.sql.SparkSession
object IncrementalETL {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("Incremental ETL")
.enableHiveSupport()
.getOrCreate()
// 从MySQL中增量读取user_info表数据
val user_info =
spark.read
.jdbc(
url = "jdbc:mysql://localhost:3306/shtd_store",
table = "user_info",
column = "update_time",
lowerBound = getLowerBound("user_info"),
upperBound = System.currentTimeMillis(),
numPartitions = 10,
connectionProperties = getProperties()
)
// 将读取到的数据存储到Hive中的ods库中的user_info表
user_info.write
.mode("append")
.insertInto("ods.user_info")
// 从MySQL中增量读取sku_info表数据
val sku_info =
spark.read
.jdbc(
url = "jdbc:mysql://localhost:3306/shtd_store",
table = "sku_info",
column = "update_time",
lowerBound = getLowerBound("sku_info"),
upperBound = System.currentTimeMillis(),
numPartitions = 10,
connectionProperties = getProperties()
)
// 将读取到的数据存储到Hive中的ods库中的sku_info表
sku_info.write
.mode("append")
.insertInto("ods.sku_info")
// 从MySQL中增量读取base_province表数据
val base_province =
spark.read
.jdbc(
url = "jdbc:mysql://localhost:3306/shtd_store",
table = "base_province",
column = "update_time",
lowerBound = getLowerBound("base_province"),
upperBound = System.currentTimeMillis(),
numPartitions = 10,
connectionProperties = getProperties()
)
// 将读取到的数据存储到Hive中的ods库中的base_province表
base_province.write
.mode("append")
.insertInto("ods.base_province")
// 从MySQL中增量读取base_region表数据
val base_region =
spark.read
.jdbc(
url = "jdbc:mysql://localhost:3306/shtd_store",
table = "base_region",
column = "update_time",
lowerBound = getLowerBound("base_region"),
upperBound = System.currentTimeMillis(),
numPartitions = 10,
connectionProperties = getProperties()
)
// 将读取到的数据存储到Hive中的ods库中的base_region表
base_region.write
.mode("append")
.insertInto("ods.base_region")
// 从MySQL中增量读取order_info表数据
val order_info =
spark.read
.jdbc(
url = "jdbc:mysql://localhost:3306/shtd_store",
table = "order_info",
column = "update_time",
lowerBound = getLowerBound("order_info"),
upperBound = System.currentTimeMillis(),
numPartitions = 10,
connectionProperties = getProperties()
)
// 将读取到的数据存储到Hive中的ods库中的order_info表
order_info.write
.mode("append")
.insertInto("ods.order_info")
// 从MySQL中增量读取order_detail表数据
val order_detail =
spark.read
.jdbc(
url = "jdbc:mysql://localhost:3306/shtd_store",
table = "order_detail",
column = "update_time",
lowerBound = getLowerBound("order_detail"),
upperBound = System.currentTimeMillis(),
numPartitions = 10,
connectionProperties = getProperties()
)
// 将读取到的数据存储到Hive中的ods库中的order_detail表
order_detail.write
.mode("append")
.insertInto("ods.order_detail")
// 关闭SparkSession
spark.stop()
}
// 获取MySQL连接属性
def getProperties(): Properties = {
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "123456")
properties.put("driver", "com.mysql.jdbc.Driver")
properties
}
// 获取每个表的最小更新时间
def getLowerBound(table: String): Long = {
// 从Hive中获取已经抽取到的最大更新时间
val maxUpdateTime = spark.sql(s"select max(update_time) from ods.$table").collect()(0)(0).toString.toLong
// 如果表没有数据,返回0
if (maxUpdateTime == null) 0 else maxUpdateTime
}
}
```
相关推荐
![rar](https://img-home.csdnimg.cn/images/20210720083606.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![7z](https://img-home.csdnimg.cn/images/20210720083312.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)