hudi什么版本支持bulk_insert
时间: 2023-08-31 14:03:57 浏览: 74
hudi是一种开源数据湖解决方案。对于支持bulk_insert的版本来说,Hudi 0.5.2及以上的版本都提供了bulk_insert的功能。
bulk_insert是一种用于一次性插入大量数据的方法,它可以在数据湖中快速加载大规模数据集。在Hudi中,使用bulk_insert可以将数据一次性写入到Hudi表中,而无需逐条写入数据。这样可以大大提高数据加载的速度和效率。
在0.5.2及以上版本的Hudi中,我们可以使用Spark或Flink等大数据处理框架来实现bulk_insert功能。具体操作方法是,将需要插入的数据以批量或分区的方式准备好,然后使用Hudi提供的API进行bulk_insert操作。Hudi会自动将这些数据加载到对应的数据湖表中,并保证数据的一致性和可靠性。
使用bulk_insert功能可以极大地提高数据加载的速度,尤其对于大规模数据集来说,效果更为明显。因此,对于需要一次性插入大量数据的场景,建议使用Hudi 0.5.2及以上版本来支持bulk_insert功能,以提高数据处理效率。
相关问题
抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField。使用spark-shell执行show partitions ods_ds_hudi.user_info命令
以下是抽取增量数据进入Hudi的代码:
```scala
import org.apache.spark.sql.functions._
import org.apache.hudi.QuickstartUtils._
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false&serverTimezone=UTC"
val dbProperties = new java.util.Properties()
dbProperties.setProperty("user", "root")
dbProperties.setProperty("password", "root")
val user_df = spark.read.jdbc(jdbcUrl, "user_info", dbProperties)
val hudi_options = Map[String, String](
HoodieWriteConfig.TABLE_NAME -> "user_info",
HoodieWriteConfig.RECORDKEY_FIELD_OPT_KEY -> "id",
HoodieWriteConfig.PRECOMBINE_FIELD_OPT_KEY -> "operate_time",
HoodieWriteConfig.PARTITIONPATH_FIELD_OPT_KEY -> "etl_date",
HoodieWriteConfig.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.OPERATION_OPT_KEY -> "upsert",
HoodieWriteConfig.BULK_INSERT_SORT_MODE_OPT_KEY -> "GLOBAL_SORT",
HoodieWriteConfig.BULK_INSERT_INPUT_RECORDS_NUM_OPT_KEY -> "500",
HoodieWriteConfig.BULK_INSERT_PARALLELISM_OPT_KEY -> "2",
HoodieWriteConfig.FORMAT_OPT_KEY -> "org.apache.hudi",
HoodieWriteConfig.HIVE_SYNC_ENABLED_OPT_KEY -> "false",
HoodieWriteConfig.HIVE_DATABASE_OPT_KEY -> "default",
HoodieWriteConfig.HIVE_TABLE_OPT_KEY -> "user_info",
HoodieWriteConfig.HIVE_PARTITION_FIELDS_OPT_KEY -> "etl_date",
HoodieWriteConfig.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> "org.apache.hudi.hive.NonPartitionedExtractor",
HoodieWriteConfig.HOODIE_TABLE_TYPE_OPT_KEY -> "MERGE_ON_READ"
)
val etl_date = java.time.LocalDate.now.minusDays(1).format(java.time.format.DateTimeFormatter.BASIC_ISO_DATE)
val hudi_df = user_df
.withColumn("etl_date", lit(etl_date))
.withColumn("operate_time", coalesce(col("operate_time"), col("create_time")))
.withColumn("operate_time_long", unix_timestamp(col("operate_time"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("create_time_long", unix_timestamp(col("create_time"), "yyyy-MM-dd HH:mm:ss"))
.withColumn("increment_ts", greatest(col("operate_time_long"), col("create_time_long")))
.filter(col("increment_ts") >= unix_timestamp(lit(etl_date), "yyyyMMdd"))
.selectExpr("id", "username", "age", "gender", "create_time", "operate_time")
.repartition(2)
hudi_df.write
.format("org.apache.hudi")
.options(hudi_options)
.mode("append")
.save("hdfs://localhost:9000/user/hive/warehouse/ods_ds_hudi.db/user_info")
```
执行完毕后,可以在Hive中使用`show partitions ods_ds_hudi.user_info`命令查看分区情况。
scala抽取shtd_store库中user_info的增量数据进入Hudi的ods_ds_hudi库中表user_info。根据ods_ds_hudi.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加分区,若operate_time为空,则用create_time填充,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。id作为primaryKey,operate_time作为preCombineField
以下是Scala代码示例,实现抽取增量数据进入Hudi的ods_ds_hudi库中表user_info:
```scala
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.time.LocalDate
import java.time.format.DateTimeFormatter
object IncrementalDataImport {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("IncrementalDataImport")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store?useSSL=false&serverTimezone=UTC"
val jdbcUser = "root"
val jdbcPassword = "root"
val hudiTablePath = "/user/hive/warehouse/ods_ds_hudi.db/user_info"
val hudiTableName = "user_info"
val primaryKey = "id"
val preCombineField = "operate_time"
val partitionField = "etl_date"
// 获取当前比赛日前一天的日期
val etlDate = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyyMMdd"))
// 读取MySQL中的user_info表
val user_info_df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "user_info")
.option("user", jdbcUser)
.option("password", jdbcPassword)
.load()
// 获取Hudi表中的增量数据
val hudiConfig = getQuickstartWriteConfigs
val hudiOptions = Map(HIVE_SYNC_ENABLED_OPT_KEY -> "true", HIVE_PARTITION_FIELDS_OPT_KEY -> partitionField, HIVE_DATABASE_OPT_KEY -> "ods_ds_hudi", HIVE_TABLE_OPT_KEY -> hudiTableName, HIVE_URL_OPT_KEY -> "jdbc:hive2://localhost:10000")
val hudiDf = spark.read.format("org.apache.hudi")
.options(hudiOptions)
.load(hudiTablePath + "/*/*/*/*")
// 取MySQL中每条数据的operate_time和create_time中较大的那个时间作为增量字段
val user_info_incremental_df = user_info_df
.filter("operate_time is not null or create_time is not null")
.withColumn("incremental_time", when($"operate_time".isNull, $"create_time").otherwise(when($"create_time".isNull, $"operate_time").otherwise(greatest($"operate_time", $"create_time"))))
.drop("operate_time", "create_time")
.withColumnRenamed("incremental_time", preCombineField)
// 将新增的数据抽入Hudi表中
val user_info_new_df = user_info_incremental_df.join(hudiDf, primaryKey)
.where(s"${hudiTableName}.${preCombineField} < ${user_info_incremental_df}.${preCombineField}")
.select(user_info_incremental_df.columns.head, user_info_incremental_df.columns.tail: _*)
.withColumn(partitionField, lit(etlDate))
// 将数据写入Hudi表中
user_info_new_df.write.format("org.apache.hudi")
.options(hudiConfig)
.option(PRECOMBINE_FIELD_OPT_KEY, preCombineField)
.option(RECORDKEY_FIELD_OPT_KEY, primaryKey)
.option(PARTITIONPATH_FIELD_OPT_KEY, partitionField)
.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
.mode(SaveMode.Append)
.save(hudiTablePath)
}
}
```
在该代码示例中,我们使用了Apache Hudi库来实现增量数据导入,通过读取MySQL中的user_info表,并将其增量数据与Hudi表中的数据进行比较,只将新增的数据抽入Hudi表中。同时,我们添加了分区字段etl_date,将数据按照日期进行分区。其中,primaryKey指定为id,preCombineField指定为operate_time,partitionField指定为etl_date。