使用Scala编写spark工程代码,将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中。
时间: 2024-01-22 10:18:08 浏览: 165
以下是Scala编写的Spark代码,可以实现将MySQL数据库中表的增量数据抽取到Hive中对应的表中:
```scala
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.util.Properties
object MySQLToHiveIncremental {
def main(args: Array[String]): Unit = {
// Initialize Spark Session
val spark = SparkSession.builder()
.appName("MySQL To Hive Incremental")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
// Define MySQL connection properties
val mysqlUrl = "jdbc:mysql://localhost:3306/shtd_store"
val mysqlUser = "root"
val mysqlPassword = "password"
// Define Hive database and table names
val databaseName = "ods"
val userTable = "user_info"
val skuTable = "sku_info"
val provinceTable = "base_province"
val regionTable = "base_region"
val orderTable = "order_info"
val orderDetailTable = "order_detail"
// Load the latest timestamp from Hive tables
val userLatestTimestamp = getLatestTimestampFromHive(spark, databaseName, userTable)
val skuLatestTimestamp = getLatestTimestampFromHive(spark, databaseName, skuTable)
val provinceLatestTimestamp = getLatestTimestampFromHive(spark, databaseName, provinceTable)
val regionLatestTimestamp = getLatestTimestampFromHive(spark, databaseName, regionTable)
val orderLatestTimestamp = getLatestTimestampFromHive(spark, databaseName, orderTable)
val orderDetailLatestTimestamp = getLatestTimestampFromHive(spark, databaseName, orderDetailTable)
// Define MySQL query to fetch new records
val userQuery = s"SELECT * FROM user_info WHERE updated_at > '$userLatestTimestamp'"
val skuQuery = s"SELECT * FROM sku_info WHERE updated_at > '$skuLatestTimestamp'"
val provinceQuery = s"SELECT * FROM base_province WHERE updated_at > '$provinceLatestTimestamp'"
val regionQuery = s"SELECT * FROM base_region WHERE updated_at > '$regionLatestTimestamp'"
val orderQuery = s"SELECT * FROM order_info WHERE updated_at > '$orderLatestTimestamp'"
val orderDetailQuery = s"SELECT * FROM order_detail WHERE updated_at > '$orderDetailLatestTimestamp'"
// Define MySQL connection properties
val mysqlProperties = new Properties()
mysqlProperties.setProperty("user", mysqlUser)
mysqlProperties.setProperty("password", mysqlPassword)
// Load data from MySQL using JDBC
val userDF = loadDataFromMySQL(spark, mysqlUrl, userQuery, mysqlProperties)
val skuDF = loadDataFromMySQL(spark, mysqlUrl, skuQuery, mysqlProperties)
val provinceDF = loadDataFromMySQL(spark, mysqlUrl, provinceQuery, mysqlProperties)
val regionDF = loadDataFromMySQL(spark, mysqlUrl, regionQuery, mysqlProperties)
val orderDF = loadDataFromMySQL(spark, mysqlUrl, orderQuery, mysqlProperties)
val orderDetailDF = loadDataFromMySQL(spark, mysqlUrl, orderDetailQuery, mysqlProperties)
// Write data to Hive tables
writeDataToHive(spark, userDF, databaseName, userTable)
writeDataToHive(spark, skuDF, databaseName, skuTable)
writeDataToHive(spark, provinceDF, databaseName, provinceTable)
writeDataToHive(spark, regionDF, databaseName, regionTable)
writeDataToHive(spark, orderDF, databaseName, orderTable)
writeDataToHive(spark, orderDetailDF, databaseName, orderDetailTable)
// Stop Spark Session
spark.stop()
}
/**
* Load data from MySQL using JDBC
*
* @param spark Spark Session object
* @param mysqlUrl MySQL connection URL
* @param query MySQL query to fetch data
* @param properties MySQL connection properties
* @return DataFrame containing the fetched data
*/
def loadDataFromMySQL(spark: SparkSession, mysqlUrl: String, query: String, properties: Properties): DataFrame = {
spark.read.jdbc(mysqlUrl, s"($query) as tmp", properties)
}
/**
* Write data to Hive table
*
* @param spark Spark Session object
* @param df DataFrame containing the data to write
* @param database Hive database name
* @param table Hive table name
*/
def writeDataToHive(spark: SparkSession, df: DataFrame, database: String, table: String): Unit = {
df.write.mode(SaveMode.Append).insertInto(s"$database.$table")
}
/**
* Get the latest timestamp from a Hive table
*
* @param spark Spark Session object
* @param database Hive database name
* @param table Hive table name
* @return Latest timestamp as a string
*/
def getLatestTimestampFromHive(spark: SparkSession, database: String, table: String): String = {
val sql = s"SELECT MAX(updated_at) as latest_ts FROM $database.$table"
val result = spark.sql(sql).collect()(0).getString(0)
if (result == null) "1970-01-01 00:00:00" else result
}
}
```
在此代码中,我们首先初始化了Spark Session,并定义了MySQL的连接参数和Hive数据库和表的名称。然后,我们从Hive中获取了表中的最新时间戳,并根据这些最新时间戳构建了MySQL查询来获取增量数据。接下来,我们使用JDBC从MySQL中加载数据,并将其写入到Hive表中。最后,我们停止Spark Session并完成了整个过程。
阅读全文
相关推荐
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![rar](https://img-home.csdnimg.cn/images/20241231044955.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![docx](https://img-home.csdnimg.cn/images/20241231044901.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![zip](https://img-home.csdnimg.cn/images/20241231045053.png)
![application/x-rar](https://img-home.csdnimg.cn/images/20210720083606.png)
![xml](https://img-home.csdnimg.cn/images/20250102104920.png)
![rar](https://img-home.csdnimg.cn/images/20241231044955.png)
![pdf](https://img-home.csdnimg.cn/images/20241231044930.png)