环境说明: 服务端登录地址详见各任务服务端说明。 补充说明:各主机可通过Asbru工具或SSH客户端进行SSH访问; 主节点MySQL数据库用户名/密码:root/123456(已配置远程连接); Hive的配置文件位于/opt/apache-hive-2.3.4-bin/conf/ Spark任务在Yarn上用Client运行,方便观察日志。 子任务一:数据抽取 编写Scala代码,使用Spark将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中。 1、 抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。根据ods.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.user_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;在idea中编写
时间: 2023-11-28 21:49:21 浏览: 208
思谷手持机一期使用说明1
Scala代码,连接MySQL和Hive,并实现数据抽取功能。代码如下:
```scala
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.apache.spark.sql.SparkSession
object DataExtract {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DataExtract")
.enableHiveSupport()
.getOrCreate()
// MySQL配置信息
val mysqlUrl = "jdbc:mysql://192.168.1.101:3306/shtd_store"
val mysqlUser = "root"
val mysqlPwd = "123456"
// Hive配置信息
val hiveDBName = "ods"
val hiveTableNames = Array("user_info", "sku_info", "base_province", "base_region", "order_info", "order_detail")
// 获取当前比赛日的前一天日期
val sdf = new SimpleDateFormat("yyyyMMdd")
val cal = Calendar.getInstance()
cal.add(Calendar.DATE, -1)
val etlDate = sdf.format(cal.getTime)
// 循环抽取各个表的增量数据
for (tableName <- hiveTableNames) {
val hiveTableName = hiveDBName + "." + tableName
val mysqlTableName = tableName
val primaryKey = "id"
val incrField = "operate_time"
val createTimeField = "create_time"
// 获取Hive中最新的增量字段值
val sql = s"select max($incrField) from $hiveTableName where etl_date='$etlDate'"
val maxIncrFieldValue = spark.sql(sql).collect()(0)(0)
// 构造MySQL查询语句
var mysqlSql = s"select * from $mysqlTableName where 1=1"
if (maxIncrFieldValue != null) {
mysqlSql += s" and $incrField>'$maxIncrFieldValue'"
} else {
mysqlSql += s" and $createTimeField>'$etlDate'"
}
// 从MySQL读取数据
val jdbcDF = spark.read.format("jdbc")
.option("url", mysqlUrl)
.option("user", mysqlUser)
.option("password", mysqlPwd)
.option("dbtable", mysqlSql)
.load()
// 将数据写入Hive,并添加静态分区
jdbcDF.write.mode("append")
.partitionBy("etl_date")
.saveAsTable(hiveTableName, "hive", SaveMode.Append, Map("etl_date" -> etlDate))
}
spark.stop()
}
}
```
该程序将MySQL中的增量数据抽取到Hive中,并在Hive中添加了静态分区。其中,使用了Hive的SaveMode.Append模式,表示追加写入数据,如果表不存在则会自动创建。同时,使用了Spark的partitionBy函数来添加静态分区,确保数据能够按照日期进行查询和管理。
阅读全文