使用Spark抽取MySQL指定数据表中的增量数据到ods层的指定的分区表中
时间: 2023-03-30 09:01:24 浏览: 450
可以回答这个问题。使用Spark可以通过读取MySQL的binlog日志,实现增量数据的抽取。具体步骤包括:配置binlog参数、使用Spark读取binlog、解析binlog、将数据写入ods层的指定分区表中。
相关问题
使用Scala编写spark工程代码,将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中。 抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。根据ods.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段类型为String,且值为当前2023年4月16的前一天日期(分区字段格式为yyyyMMdd)。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.time.LocalDate
import java.time.format.DateTimeFormatter
object IncrementalDataExtraction {
def main(args: Array[String]): Unit = {
// Create Spark session
val spark = SparkSession.builder()
.appName("Incremental Data Extraction")
.enableHiveSupport()
.getOrCreate()
// Set date format for partition column
val dateFormat = DateTimeFormatter.ofPattern("yyyyMMdd")
val partitionDate = LocalDate.of(2023, 4, 15).format(dateFormat)
// Load MySQL tables
val user_info = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_store")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "user_info")
.option("user", "your_user_name")
.option("password", "your_password")
.load()
val sku_info = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_store")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "sku_info")
.option("user", "your_user_name")
.option("password", "your_password")
.load()
val base_province = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_store")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "base_province")
.option("user", "your_user_name")
.option("password", "your_password")
.load()
val base_region = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_store")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "base_region")
.option("user", "your_user_name")
.option("password", "your_password")
.load()
val order_info = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_store")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "order_info")
.option("user", "your_user_name")
.option("password", "your_password")
.load()
val order_detail = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_store")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "order_detail")
.option("user", "your_user_name")
.option("password", "your_password")
.load()
// Add partition column
val user_info_with_static_partition = user_info.withColumn("partition_date", lit(partitionDate))
// Extract incremental data for user_info
val max_operate_time = user_info.select(max("operate_time")).first().getTimestamp(0)
val max_create_time = user_info.select(max("create_time")).first().getTimestamp(0)
val user_info_incremental = user_info_with_static_partition.filter(
col("operate_time") > max_operate_time || col("create_time") > max_create_time
)
// Write incremental data to Hive table
user_info_incremental.write.mode("append").insertInto("ods.user_info")
// Drop temporary tables
user_info.unpersist()
sku_info.unpersist()
base_province.unpersist()
base_region.unpersist()
order_info.unpersist()
order_detail.unpersist()
// Stop Spark session
spark.stop()
}
}
抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.user_info命令
以下是抽取增量数据进入Hudi的代码:
```scala
import org.apache.spark.sql.functions._
import org.apache.hudi.QuickstartUtils._
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false&serverTimezone=UTC"
val dbProperties = new java.util.Properties()
dbProperties.setProperty("user", "root")
dbProperties.setProperty("password", "root")
val user_df = spark.read.jdbc(jdbcUrl, "user_info", dbProperties)
val hudi_options = Map[String, String](
HoodieWriteConfig.TABLE_NAME -> "user_info",
HoodieWriteConfig.RECORDKEY_FIELD_OPT_KEY -> "id",
HoodieWriteConfig.PRECOMBINE_FIELD_OPT_KEY -> "operate_time",
HoodieWriteConfig.PARTITIONPATH_FIELD_OPT_KEY -> "etl_date",
HoodieWriteConfig.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.OPERATION_OPT_KEY -> "upsert",
HoodieWriteConfig.BULK_INSERT_SORT_MODE_OPT_KEY -> "GLOBAL_SORT",
HoodieWriteConfig.BULK_INSERT_INPUT_RECORDS_NUM_OPT_KEY -> "500",
HoodieWriteConfig.BULK_INSERT_PARALLELISM_OPT_KEY -> "2",
HoodieWriteConfig.FORMAT_OPT_KEY -> "org.apache.hudi",
HoodieWriteConfig.HIVE_SYNC_ENABLED_OPT_KEY -> "false",
HoodieWriteConfig.HIVE_DATABASE_OPT_KEY -> "default",
HoodieWriteConfig.HIVE_TABLE_OPT_KEY -> "user_info",
HoodieWriteConfig.HIVE_PARTITION_FIELDS_OPT_KEY -> "etl_date",
HoodieWriteConfig.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> "org.apache.hudi.hive.NonPartitionedExtractor",
HoodieWriteConfig.HOODIE_TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ"
)
val etl_date = java.time.LocalDate.now.minusDays(1).format(java.time.format.DateTimeFormatter.BASIC_ISO_DATE)
val hudi_df = user_df
.withColumn("etl_date", lit(etl_date))
.withColumn("operate_time", coalesce(col("operate_time"), col("create_time")))
.withColumn("operate_time_long", unix_timestamp(col("operate_time"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("create_time_long", unix_timestamp(col("create_time"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("increment_ts", greatest(col("operate_time_long"), col("create_time_long")))
.filter(col("increment_ts") >= unix_timestamp(lit(etl_date), "yyyyMMdd"))
.selectExpr("id", "username", "age", "gender", "create_time", "operate_time")
.repartition(2)
hudi_df.write
.format("org.apache.hudi")
.options(hudi_options)
.mode("append")
.save("hdfs://localhost:9000/user/hive/warehouse/ods_ds_hudi.db/user_info")
```
执行完毕后,可以在Hive中使用`show partitions ods_ds_hudi.user_info`命令查看分区情况。
阅读全文