Spark离线处理:增量抽取MySQL至Hive的实践

需积分: 0 16 下载量 149 浏览量 更新于2024-08-03 收藏 109KB DOCX 举报
在本次离线数据处理任务中,主要目标是从MySQL数据库的ds_db01库中的customer_inf表中抽取增量数据,并将这些数据导入Hive的ods库中,创建名为customer_inf的表。这个过程需要利用Apache Spark进行数据处理,特别是使用Spark SQL来实现数据抽取和转换。 首先,为了连接MySQL数据库,需要确保mysql-connector-j-8.1.0.jar包已复制到Spark的jars目录中,以便Spark能够正确地与MySQL交互。MySQL表中有一个名为modified_time的字段,它将被用作确定增量数据的依据,只有当该字段的值大于等于19999时,记录才会被抽取到ods库。 在数据抽取过程中,将通过一个临时表data执行SQL查询,其条件是modified_time字段的值小于指定的时间戳'2023-1-1',从而获取19999之后的新增数据。对于datetime和timestamp类型的字段,在导入到Hive时,可能需要进行类型转换,因为Hive的默认行为可能与MySQL有所不同。 在SparkSession的构建中,设置了master节点的URL、应用名称、Hive支持、以及仓库目录和元数据存储库的URI。这些配置确保了Spark能够正确地与Hive集成,并且数据会被存储在指定的HDFS路径下。第一天的任务是全量抽取数据并清洗,然后存入ods库;第二天则执行增量抽取并清洗,同样存入dwd库。 具体操作代码如下: ```java import org.apache.spark.sql.SparkSession object DataExtractor { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("spark://master:7077") .appName("抽取数据") .enableHiveSupport() // 启用Hive支持 .config("spark.sql.warehouse.dir", "hdfs://master:50070/usr/hive/warehouse") // 设置仓库目录 .config("hive.metastore.uris", "thrift://master:9083") // 设置元数据存储库URI .getOrCreate() val jdbcUrl = "jdbc:mysql://master:3306/ds_db01?serverTimezone=GMT" // MySQL连接字符串,考虑到时间区间的差异 val dataFrame = spark.read.format("jdbc") // 使用JDBC读取MySQL数据 .option("url", jdbcUrl) .option("dbtable", "customer_inf") // 表名 .option("user", "your_username") // 数据库用户名 .option("password", "your_password") // 数据库密码 .load() // 加载数据 // 增量抽取 val modifiedTimeCutOff = to_timestamp("2023-1-1") val incrementalDataFrame = dataFrame.filter($"modified_time" >= modifiedTimeCutOff) // 添加分区 etl_date 为前一天日期(yyyyMMdd) val currentDate = java.time.LocalDate.now().minusDays(1).format("yyyyMMdd") incrementalDataFrame.write .partitionBy("etl_date", StringType(), currentDate) .format("parquet") // 或者根据需求选择其他格式 .mode("append") // 按照追加模式写入,确保增量数据不覆盖现有分区 .saveAsTable("ods.customer_inf") // 清洗步骤(这里未展示,可以根据实际清洗需求编写清洗逻辑) // ... spark.stop() // 关闭SparkSession } } ``` 总结来说,这个任务的核心在于使用Spark的JDBC读取功能从MySQL抽取数据,然后根据时间戳过滤增量数据,最后按照指定格式添加分区并写入Hive。同时,要注意调整连接字符串以处理MySQL中的时间类型,并确保配置了正确的Spark和Hive连接参数。在后续的清洗阶段,可能涉及数据清洗、转换等操作,这取决于具体的数据质量和业务需求。