环境说明: 服务端登录地址详见各任务服务端说明。 补充说明:各主机可通过Asbru工具或SSH客户端进行SSH访问; 主节点MySQL数据库用户名/密码:root/123456(已配置远程连接); Spark任务在Yarn上用Client运行,方便观察日志。 子任务一:数据抽取 编写Scala代码,使用Spark将MySQL库中表ChangeRecord,BaseMachine,MachineData, ProduceRecord全量抽取到Hudi的hudi_gy_ods库(路径为/user/hive/warehouse/hudi_gy_ods.db)中对应表changerecord,basemachine, machinedata,producerecord中。 1、 抽取MySQL的shtd_industry库中ChangeRecord表的全量数据进入Hudi的hudi_gy_ods库中表changerecord,字段排序、类型不变,分区字段为etldate,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。PRECOMBINE_FIELD使用ChangeEndTime,ChangeID和ChangeMachineID作为联合主键。使用spark-sql的cli执行select count(*) from hudi_gy_ods.changerecord命令,将spark-sql的cli执行结果分别截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
时间: 2023-11-28 11:50:37 浏览: 149
基于C++和POLL的服务端和客户端源码(含项目说明+详细注释).zip
5星 · 资源好评率100%
抽取MySQL的shtd_industry库中ChangeRecord表的全量数据进入Hudi的hudi_gy_ods库中表changerecord的Scala代码如下:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
object ChangeRecordDataExtract {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ChangeRecordDataExtract")
.enableHiveSupport()
.getOrCreate()
val etlDate = args(0)
val hoodieOptions = Map(
TABLE_TYPE_OPT_VAL -> MOR_TABLE_TYPE_OPT_VAL,
TABLE_NAME -> "changerecord",
RECORDKEY_FIELD_OPT_KEY -> "ChangeID",
PRECOMBINE_FIELD_OPT_KEY -> "ChangeEndTime,ChangeID,ChangeMachineID",
PARTITIONPATH_FIELD_OPT_KEY -> "etldate",
OPERATION_OPT_KEY -> UPSERT_OPERATION_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> classOf[SimpleKeyGenerator].getName,
PATH_OPT_KEY -> s"/user/hive/warehouse/hudi_gy_ods.db/changerecord",
HUDI_CLEANER_POLICY_OPT_KEY -> "KEEP_LATEST_COMMITS",
META_SYNC_ENABLED_OPT_KEY -> "false",
HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName,
HIVE_PARTITION_FIELDS_OPT_KEY -> "etldate",
HIVE_DATABASE_OPT_KEY -> "hudi_gy_ods",
HIVE_TABLE_OPT_KEY -> "changerecord"
)
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_industry"
val jdbcProperties = new java.util.Properties()
jdbcProperties.setProperty("user", "root")
jdbcProperties.setProperty("password", "123456")
val changeRecordDF = spark.read.jdbc(jdbcUrl, "ChangeRecord", jdbcProperties)
.withColumn("etldate", lit(etlDate))
changeRecordDF.write
.format("org.apache.hudi")
.options(hoodieOptions)
.mode("append")
.save()
spark.sql("select count(*) from hudi_gy_ods.changerecord").show()
spark.stop()
}
}
```
其中,etlDate为当前比赛日的前一天日期,使用lit函数将该值添加为一个新的列etldate。hoodieOptions为Hudi的写入配置,对应Hudi表changerecord。jdbcUrl和jdbcProperties为连接MySQL的配置,使用spark.read.jdbc读取MySQL表ChangeRecord的数据。最后使用Hudi的save方法将数据写入Hudi表中,并使用spark.sql查询Hudi表changerecord的数据量。
阅读全文