hudi什么版本支持bulk_insert

时间: 2023-08-31 14:03:57 浏览: 74
hudi是一种开源数据湖解决方案。对于支持bulk_insert的版本来说,Hudi 0.5.2及以上的版本都提供了bulk_insert的功能。 bulk_insert是一种用于一次性插入大量数据的方法,它可以在数据湖中快速加载大规模数据集。在Hudi中,使用bulk_insert可以将数据一次性写入到Hudi表中,而无需逐条写入数据。这样可以大大提高数据加载的速度和效率。 在0.5.2及以上版本的Hudi中,我们可以使用Spark或Flink等大数据处理框架来实现bulk_insert功能。具体操作方法是,将需要插入的数据以批量或分区的方式准备好,然后使用Hudi提供的API进行bulk_insert操作。Hudi会自动将这些数据加载到对应的数据湖表中,并保证数据的一致性和可靠性。 使用bulk_insert功能可以极大地提高数据加载的速度,尤其对于大规模数据集来说,效果更为明显。因此,对于需要一次性插入大量数据的场景,建议使用Hudi 0.5.2及以上版本来支持bulk_insert功能,以提高数据处理效率。
相关问题

抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.user_info命令

以下是抽取增量数据进入Hudi的代码: ```scala import org.apache.spark.sql.functions._ import org.apache.hudi.QuickstartUtils._ val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false&serverTimezone=UTC" val dbProperties = new java.util.Properties() dbProperties.setProperty("user", "root") dbProperties.setProperty("password", "root") val user_df = spark.read.jdbc(jdbcUrl, "user_info", dbProperties) val hudi_options = Map[String, String]( HoodieWriteConfig.TABLE_NAME -> "user_info", HoodieWriteConfig.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.PRECOMBINE_FIELD_OPT_KEY -> "operate_time", HoodieWriteConfig.PARTITIONPATH_FIELD_OPT_KEY -> "etl_date", HoodieWriteConfig.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", HoodieWriteConfig.OPERATION_OPT_KEY -> "upsert", HoodieWriteConfig.BULK_INSERT_SORT_MODE_OPT_KEY -> "GLOBAL_SORT", HoodieWriteConfig.BULK_INSERT_INPUT_RECORDS_NUM_OPT_KEY -> "500", HoodieWriteConfig.BULK_INSERT_PARALLELISM_OPT_KEY -> "2", HoodieWriteConfig.FORMAT_OPT_KEY -> "org.apache.hudi", HoodieWriteConfig.HIVE_SYNC_ENABLED_OPT_KEY -> "false", HoodieWriteConfig.HIVE_DATABASE_OPT_KEY -> "default", HoodieWriteConfig.HIVE_TABLE_OPT_KEY -> "user_info", HoodieWriteConfig.HIVE_PARTITION_FIELDS_OPT_KEY -> "etl_date", HoodieWriteConfig.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> "org.apache.hudi.hive.NonPartitionedExtractor", HoodieWriteConfig.HOODIE_TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ" ) val etl_date = java.time.LocalDate.now.minusDays(1).format(java.time.format.DateTimeFormatter.BASIC_ISO_DATE) val hudi_df = user_df .withColumn("etl_date", lit(etl_date)) .withColumn("operate_time", coalesce(col("operate_time"), col("create_time"))) .withColumn("operate_time_long", unix_timestamp(col("operate_time"), "yyyy-MM-dd HH:mm:ss")) .withColumn("create_time_long", unix_timestamp(col("create_time"), "yyyy-MM-dd HH:mm:ss")) .withColumn("increment_ts", greatest(col("operate_time_long"), col("create_time_long"))) .filter(col("increment_ts") >= unix_timestamp(lit(etl_date), "yyyyMMdd")) .selectExpr("id", "username", "age", "gender", "create_time", "operate_time") .repartition(2) hudi_df.write .format("org.apache.hudi") .options(hudi_options) .mode("append") .save("hdfs://localhost:9000/user/hive/warehouse/ods_ds_hudi.db/user_info") ``` 执行完毕后,可以在Hive中使用`show partitions ods_ds_hudi.user_info`命令查看分区情况。

scala抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField

以下是Scala代码示例,实现抽取增量数据进入Hudi的ods_ds_hudi库中表user_info: ```scala import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.keygen.SimpleKeyGenerator import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import java.time.LocalDate import java.time.format.DateTimeFormatter object IncrementalDataImport { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("IncrementalDataImport") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false&serverTimezone=UTC" val jdbcUser = "root" val jdbcPassword = "root" val hudiTablePath = "/user/hive/warehouse/ods_ds_hudi.db/user_info" val hudiTableName = "user_info" val primaryKey = "id" val preCombineField = "operate_time" val partitionField = "etl_date" // 获取当前比赛日前一天的日期 val etlDate = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyyMMdd")) // 读取MySQL中的user_info表 val user_info_df = spark.read.format("jdbc") .option("url", jdbcUrl) .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", "user_info") .option("user", jdbcUser) .option("password", jdbcPassword) .load() // 获取Hudi表中的增量数据 val hudiConfig = getQuickstartWriteConfigs val hudiOptions = Map(HIVE_SYNC_ENABLED_OPT_KEY -> "true", HIVE_PARTITION_FIELDS_OPT_KEY -> partitionField, HIVE_DATABASE_OPT_KEY -> "ods_ds_hudi", HIVE_TABLE_OPT_KEY -> hudiTableName, HIVE_URL_OPT_KEY -> "jdbc:hive2://localhost:10000") val hudiDf = spark.read.format("org.apache.hudi") .options(hudiOptions) .load(hudiTablePath + "/*/*/*/*") // 取MySQL中每条数据的operate_time和create_time中较大的那个时间作为增量字段 val user_info_incremental_df = user_info_df .filter("operate_time is not null or create_time is not null") .withColumn("incremental_time", when($"operate_time".isNull, $"create_time").otherwise(when($"create_time".isNull, $"operate_time").otherwise(greatest($"operate_time", $"create_time")))) .drop("operate_time", "create_time") .withColumnRenamed("incremental_time", preCombineField) // 将新增的数据抽入Hudi表中 val user_info_new_df = user_info_incremental_df.join(hudiDf, primaryKey) .where(s"${hudiTableName}.${preCombineField} < ${user_info_incremental_df}.${preCombineField}") .select(user_info_incremental_df.columns.head, user_info_incremental_df.columns.tail: _*) .withColumn(partitionField, lit(etlDate)) // 将数据写入Hudi表中 user_info_new_df.write.format("org.apache.hudi") .options(hudiConfig) .option(PRECOMBINE_FIELD_OPT_KEY, preCombineField) .option(RECORDKEY_FIELD_OPT_KEY, primaryKey) .option(PARTITIONPATH_FIELD_OPT_KEY, partitionField) .option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName) .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) .mode(SaveMode.Append) .save(hudiTablePath) } } ``` 在该代码示例中,我们使用了Apache Hudi库来实现增量数据导入,通过读取MySQL中的user_info表,并将其增量数据与Hudi表中的数据进行比较,只将新增的数据抽入Hudi表中。同时,我们添加了分区字段etl_date,将数据按照日期进行分区。其中,primaryKey指定为id,preCombineField指定为operate_time,partitionField指定为etl_date。

相关推荐

最新推荐

recommend-type

Flink +hudi+presto 流程图.docx

Flink +hudi+presto 流程图.docx 自己实现后画的一个流程图,便于理解
recommend-type

起点小说解锁.js

起点小说解锁.js
recommend-type

RTL8188FU-Linux-v5.7.4.2-36687.20200602.tar(20765).gz

REALTEK 8188FTV 8188eus 8188etv linux驱动程序稳定版本, 支持AP,STA 以及AP+STA 共存模式。 稳定支持linux4.0以上内核。
recommend-type

管理建模和仿真的文件

管理Boualem Benatallah引用此版本:布阿利姆·贝纳塔拉。管理建模和仿真。约瑟夫-傅立叶大学-格勒诺布尔第一大学,1996年。法语。NNT:电话:00345357HAL ID:电话:00345357https://theses.hal.science/tel-003453572008年12月9日提交HAL是一个多学科的开放存取档案馆,用于存放和传播科学研究论文,无论它们是否被公开。论文可以来自法国或国外的教学和研究机构,也可以来自公共或私人研究中心。L’archive ouverte pluridisciplinaire
recommend-type

:YOLOv1目标检测算法:实时目标检测的先驱,开启计算机视觉新篇章

![:YOLOv1目标检测算法:实时目标检测的先驱,开启计算机视觉新篇章](https://img-blog.csdnimg.cn/img_convert/69b98e1a619b1bb3c59cf98f4e397cd2.png) # 1. 目标检测算法概述 目标检测算法是一种计算机视觉技术,用于识别和定位图像或视频中的对象。它在各种应用中至关重要,例如自动驾驶、视频监控和医疗诊断。 目标检测算法通常分为两类:两阶段算法和单阶段算法。两阶段算法,如 R-CNN 和 Fast R-CNN,首先生成候选区域,然后对每个区域进行分类和边界框回归。单阶段算法,如 YOLO 和 SSD,一次性执行检
recommend-type

ActionContext.getContext().get()代码含义

ActionContext.getContext().get() 是从当前请求的上下文对象中获取指定的属性值的代码。在ActionContext.getContext()方法的返回值上,调用get()方法可以获取当前请求中指定属性的值。 具体来说,ActionContext是Struts2框架中的一个类,它封装了当前请求的上下文信息。在这个上下文对象中,可以存储一些请求相关的属性值,比如请求参数、会话信息、请求头、应用程序上下文等等。调用ActionContext.getContext()方法可以获取当前请求的上下文对象,而调用get()方法可以获取指定属性的值。 例如,可以使用 Acti
recommend-type

c++校园超市商品信息管理系统课程设计说明书(含源代码) (2).pdf

校园超市商品信息管理系统课程设计旨在帮助学生深入理解程序设计的基础知识,同时锻炼他们的实际操作能力。通过设计和实现一个校园超市商品信息管理系统,学生掌握了如何利用计算机科学与技术知识解决实际问题的能力。在课程设计过程中,学生需要对超市商品和销售员的关系进行有效管理,使系统功能更全面、实用,从而提高用户体验和便利性。 学生在课程设计过程中展现了积极的学习态度和纪律,没有缺勤情况,演示过程流畅且作品具有很强的使用价值。设计报告完整详细,展现了对问题的深入思考和解决能力。在答辩环节中,学生能够自信地回答问题,展示出扎实的专业知识和逻辑思维能力。教师对学生的表现予以肯定,认为学生在课程设计中表现出色,值得称赞。 整个课程设计过程包括平时成绩、报告成绩和演示与答辩成绩三个部分,其中平时表现占比20%,报告成绩占比40%,演示与答辩成绩占比40%。通过这三个部分的综合评定,最终为学生总成绩提供参考。总评分以百分制计算,全面评估学生在课程设计中的各项表现,最终为学生提供综合评价和反馈意见。 通过校园超市商品信息管理系统课程设计,学生不仅提升了对程序设计基础知识的理解与应用能力,同时也增强了团队协作和沟通能力。这一过程旨在培养学生综合运用技术解决问题的能力,为其未来的专业发展打下坚实基础。学生在进行校园超市商品信息管理系统课程设计过程中,不仅获得了理论知识的提升,同时也锻炼了实践能力和创新思维,为其未来的职业发展奠定了坚实基础。 校园超市商品信息管理系统课程设计的目的在于促进学生对程序设计基础知识的深入理解与掌握,同时培养学生解决实际问题的能力。通过对系统功能和用户需求的全面考量,学生设计了一个实用、高效的校园超市商品信息管理系统,为用户提供了更便捷、更高效的管理和使用体验。 综上所述,校园超市商品信息管理系统课程设计是一项旨在提升学生综合能力和实践技能的重要教学活动。通过此次设计,学生不仅深化了对程序设计基础知识的理解,还培养了解决实际问题的能力和团队合作精神。这一过程将为学生未来的专业发展提供坚实基础,使其在实际工作中能够胜任更多挑战。
recommend-type

"互动学习:行动中的多样性与论文攻读经历"

多样性她- 事实上SCI NCES你的时间表ECOLEDO C Tora SC和NCESPOUR l’Ingén学习互动,互动学习以行动为中心的强化学习学会互动,互动学习,以行动为中心的强化学习计算机科学博士论文于2021年9月28日在Villeneuve d'Asq公开支持马修·瑟林评审团主席法布里斯·勒菲弗尔阿维尼翁大学教授论文指导奥利维尔·皮耶昆谷歌研究教授:智囊团论文联合主任菲利普·普雷教授,大学。里尔/CRISTAL/因里亚报告员奥利维耶·西格德索邦大学报告员卢多维奇·德诺耶教授,Facebook /索邦大学审查员越南圣迈IMT Atlantic高级讲师邀请弗洛里安·斯特鲁布博士,Deepmind对于那些及时看到自己错误的人...3谢谢你首先,我要感谢我的两位博士生导师Olivier和Philippe。奥利维尔,"站在巨人的肩膀上"这句话对你来说完全有意义了。从科学上讲,你知道在这篇论文的(许多)错误中,你是我可以依
recommend-type

:YOLO目标检测算法的挑战与机遇:数据质量、计算资源与算法优化,探索未来发展方向

![:YOLO目标检测算法的挑战与机遇:数据质量、计算资源与算法优化,探索未来发展方向](https://img-blog.csdnimg.cn/7e3d12895feb4651b9748135c91e0f1a.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBA5rKJ6YaJ77yM5LqO6aOO5Lit,size_20,color_FFFFFF,t_70,g_se,x_16) # 1. YOLO目标检测算法简介 YOLO(You Only Look Once)是一种
recommend-type

设计一个算法,输出在顺序表{3,6,2,10,1,8,5,7,4,9}中采用顺序方法查找关键字5的过程。

好的,我来回答你的问题。 在顺序表中查找关键字5的过程如下: 1. 从顺序表的第一个元素(即3)开始,逐个比较元素的值与目标值5是否相等。 2. 发现第一个元素3不等于目标值5,继续向下查找。 3. 查找到第三个元素2时,发现该元素也不等于目标值5,继续向下查找。 4. 查找到第四个元素10时,发现该元素也不等于目标值5,继续向下查找。 5. 查找到第五个元素1时,发现该元素也不等于目标值5,继续向下查找。 6. 查找到第六个元素8时,发现该元素也不等于目标值5,继续向下查找。 7. 查找到第七个元素5时,发现该元素等于目标值5,查找成功。 因此,顺序表中采用顺序方法查找关键