读取hudi分区字段
时间: 2023-07-18 13:03:35 浏览: 140
在 Hudi 中,每个数据集都有一个分区字段,它是用来将数据划分为不同小部分的字段。分区字段通常是日期、时间戳或者其他可以用来划分数据的字段。在 Hudi 中读取分区字段需要使用 HoodieTableMetaClient 类,该类提供了读取分区字段的方法。下面是一个示例代码:
```java
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(metaPath, jsc.hadoopConfiguration());
String partitionField = metaClient.getTableConfig().getPartitionFields().get(0);
System.out.println("Partition Field: " + partitionField);
```
上述代码中,`metaPath` 是 Hudi 数据集的元数据路径,`jsc` 是 SparkContext 对象。`getTableConfig()` 方法返回 Hudi 数据集的配置信息,`getPartitionFields()` 方法返回分区字段列表,我们可以从中获取第一个分区字段。
相关问题
scala抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField
以下是Scala代码示例,实现抽取增量数据进入Hudi的ods_ds_hudi库中表user_info:
```scala
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.time.LocalDate
import java.time.format.DateTimeFormatter
object IncrementalDataImport {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("IncrementalDataImport")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false&serverTimezone=UTC"
val jdbcUser = "root"
val jdbcPassword = "root"
val hudiTablePath = "/user/hive/warehouse/ods_ds_hudi.db/user_info"
val hudiTableName = "user_info"
val primaryKey = "id"
val preCombineField = "operate_time"
val partitionField = "etl_date"
// 获取当前比赛日前一天的日期
val etlDate = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyyMMdd"))
// 读取MySQL中的user_info表
val user_info_df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "user_info")
.option("user", jdbcUser)
.option("password", jdbcPassword)
.load()
// 获取Hudi表中的增量数据
val hudiConfig = getQuickstartWriteConfigs
val hudiOptions = Map(HIVE_SYNC_ENABLED_OPT_KEY -> "true", HIVE_PARTITION_FIELDS_OPT_KEY -> partitionField, HIVE_DATABASE_OPT_KEY -> "ods_ds_hudi", HIVE_TABLE_OPT_KEY -> hudiTableName, HIVE_URL_OPT_KEY -> "jdbc:hive2://localhost:10000")
val hudiDf = spark.read.format("org.apache.hudi")
.options(hudiOptions)
.load(hudiTablePath + "/*/*/*/*")
// 取MySQL中每条数据的operate_time和create_time中较大的那个时间作为增量字段
val user_info_incremental_df = user_info_df
.filter("operate_time is not null or create_time is not null")
.withColumn("incremental_time", when($"operate_time".isNull, $"create_time").otherwise(when($"create_time".isNull, $"operate_time").otherwise(greatest($"operate_time", $"create_time"))))
.drop("operate_time", "create_time")
.withColumnRenamed("incremental_time", preCombineField)
// 将新增的数据抽入Hudi表中
val user_info_new_df = user_info_incremental_df.join(hudiDf, primaryKey)
.where(s"${hudiTableName}.${preCombineField} < ${user_info_incremental_df}.${preCombineField}")
.select(user_info_incremental_df.columns.head, user_info_incremental_df.columns.tail: _*)
.withColumn(partitionField, lit(etlDate))
// 将数据写入Hudi表中
user_info_new_df.write.format("org.apache.hudi")
.options(hudiConfig)
.option(PRECOMBINE_FIELD_OPT_KEY, preCombineField)
.option(RECORDKEY_FIELD_OPT_KEY, primaryKey)
.option(PARTITIONPATH_FIELD_OPT_KEY, partitionField)
.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
.mode(SaveMode.Append)
.save(hudiTablePath)
}
}
```
在该代码示例中,我们使用了Apache Hudi库来实现增量数据导入,通过读取MySQL中的user_info表,并将其增量数据与Hudi表中的数据进行比较,只将新增的数据抽入Hudi表中。同时,我们添加了分区字段etl_date,将数据按照日期进行分区。其中,primaryKey指定为id,preCombineField指定为operate_time,partitionField指定为etl_date。
环境说明: 服务端登录地址详见各任务服务端说明。 补充说明:各主机可通过Asbru工具或SSH客户端进行SSH访问; 主节点MySQL数据库用户名/密码:root/123456(已配置远程连接); Spark任务在Yarn上用Client运行,方便观察日志。 子任务一:数据抽取 编写Scala代码,使用Spark将MySQL库中表ChangeRecord,BaseMachine,MachineData, ProduceRecord全量抽取到Hudi的hudi_gy_ods库(路径为/user/hive/warehouse/hudi_gy_ods.db)中对应表changerecord,basemachine, machinedata,producerecord中。 1、 抽取MySQL的shtd_industry库中ChangeRecord表的全量数据进入Hudi的hudi_gy_ods库中表changerecord,字段排序、类型不变,分区字段为etldate,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。PRECOMBINE_FIELD使用ChangeEndTime,ChangeID和ChangeMachineID作为联合主键。使用spark-sql的cli执行select count(*) from hudi_gy_ods.changerecord命令,将spark-sql的cli执行结果分别截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
抽取MySQL的shtd_industry库中ChangeRecord表的全量数据进入Hudi的hudi_gy_ods库中表changerecord的Scala代码如下:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
object ChangeRecordDataExtract {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ChangeRecordDataExtract")
.enableHiveSupport()
.getOrCreate()
val etlDate = args(0)
val hoodieOptions = Map(
TABLE_TYPE_OPT_VAL -> MOR_TABLE_TYPE_OPT_VAL,
TABLE_NAME -> "changerecord",
RECORDKEY_FIELD_OPT_KEY -> "ChangeID",
PRECOMBINE_FIELD_OPT_KEY -> "ChangeEndTime,ChangeID,ChangeMachineID",
PARTITIONPATH_FIELD_OPT_KEY -> "etldate",
OPERATION_OPT_KEY -> UPSERT_OPERATION_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> classOf[SimpleKeyGenerator].getName,
PATH_OPT_KEY -> s"/user/hive/warehouse/hudi_gy_ods.db/changerecord",
HUDI_CLEANER_POLICY_OPT_KEY -> "KEEP_LATEST_COMMITS",
META_SYNC_ENABLED_OPT_KEY -> "false",
HIVE_STYLE_PARTITIONING_OPT_KEY -> "true",
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName,
HIVE_PARTITION_FIELDS_OPT_KEY -> "etldate",
HIVE_DATABASE_OPT_KEY -> "hudi_gy_ods",
HIVE_TABLE_OPT_KEY -> "changerecord"
)
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_industry"
val jdbcProperties = new java.util.Properties()
jdbcProperties.setProperty("user", "root")
jdbcProperties.setProperty("password", "123456")
val changeRecordDF = spark.read.jdbc(jdbcUrl, "ChangeRecord", jdbcProperties)
.withColumn("etldate", lit(etlDate))
changeRecordDF.write
.format("org.apache.hudi")
.options(hoodieOptions)
.mode("append")
.save()
spark.sql("select count(*) from hudi_gy_ods.changerecord").show()
spark.stop()
}
}
```
其中,etlDate为当前比赛日的前一天日期,使用lit函数将该值添加为一个新的列etldate。hoodieOptions为Hudi的写入配置,对应Hudi表changerecord。jdbcUrl和jdbcProperties为连接MySQL的配置,使用spark.read.jdbc读取MySQL表ChangeRecord的数据。最后使用Hudi的save方法将数据写入Hudi表中,并使用spark.sql查询Hudi表changerecord的数据量。
阅读全文