HUDI preCombinedField 总结

阅读 63

2022-01-04

前言

总结 HUDI preCombinedField,分两大类总结,一类是Spark SQL,这里指的是merge,因为只有merge语句中有多条记录,讨论preCombinedField才有意义;一类是Spark DF,HUDI0.9版本支持SQL建表和增删改查

总结

先说结论:

PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field

说明:
1、这里有ts代表设置了preCombinedField字段
2、hudi默认使用布隆索引,布隆索引只保证同一分区下同一个主键对应的值唯一,可以使用全局索引保证所有分区值唯一,这里不展开细说

    private String getDefaultIndexType(EngineType engineType) {
      switch (engineType) {
        case SPARK:
          return HoodieIndex.IndexType.BLOOM.name();
        case FLINK:
        case JAVA:
          return HoodieIndex.IndexType.INMEMORY.name();
        default:
          throw new HoodieNotSupportedException("Unsupported engine " + engineType);
      }
    }

3、如果在测试过程中,发现和我的结论不一致,可能和后面的注意事项有关。
4、当指定了hoodie.datasource.write.insert.drop.duplicates=true时,不管是insert还是upsert,如果存在历史数据则不更新。实际在源码中,如果为upsert,也会修改为insert。

if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
  operation == WriteOperationType.UPSERT) {

  log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
    s"when $INSERT_DROP_DUPS is set to be true, " +
    s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")

  operation = WriteOperationType.INSERT
}

Spark DF

先说DF建表,DF写hudi表时,默认情况下,hudi,必须指定preCombinedField,否则,会抛出异常(当为insert等其他类型时,preCombinedField可以不用设置,具体见后面的源码解读部分),示例如下

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, getQuickstartWriteConfigs}
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .master("local[*]")
    .appName("TestHuDiPreCombinedFiled")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .getOrCreate()

val tableName = "test_hudi_table"
val data = Array((7, "name12", 1.21, 108L, "2021-05-06"), (7, "name2", 2.22, 108L, "2021-05-06"),
      (7, "name3", 3.45, 108L, "2021-05-06")
    )

val df = spark.createDataFrame(data).toDF("id", "name", "price", "ts", "dt")
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD.key(), "ts"). //指定preCombinedField=ts
  option(RECORDKEY_FIELD.key(), "id").
  option(PARTITIONPATH_FIELD.key(), "dt").
  option(HIVE_STYLE_PARTITIONING.key(), true). //hive 分区路径的格式是否和hive一样,如果true,则:分区字段=
  option("hoodie.table.name", tableName).
  //      option("hoodie.datasource.write.insert.drop.duplicates", true). //不更新
  //      option(OPERATION.key(), "INSERT").
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
  mode("append").
  save(s"/tmp/${tableName}")
val read_df = spark.
read.
format("hudi").
load(s"/tmp/${tableName}" + "/*")

read_df.show()

附hoodie.properties:

#Properties saved on Sat Jul 10 15:08:16 CST 2021
#Sat Jul 10 15:08:16 CST 2021
hoodie.table.precombine.field=ts
hoodie.table.name=test_hudi_table
hoodie.archivelog.folder=archived
hoodie.table.type=COPY_ON_WRITE
hoodie.table.version=1
hoodie.timeline.layout.version=1
hoodie.table.partition.columns=dt

可见,hudi表元数据里有hoodie.table.precombine.field=ts,代表preCombinedField生效

SQL

SQL与DF不同,分为两种,有预合并和没有预合并

没有预合并

SQL默认没有预合并

spark.sql(
  s"""
     | create table ${tableName} (
     |  id int,
     |  name string,
     |  price double,
     |  ts long,
     |  dt string
     |) using hudi
     | partitioned by (dt)
     | options (
     |  primaryKey = 'id',
     |  type = 'cow'
     | )
     | location '/tmp/${tableName}'
     |""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(
  s"""
     |merge into ${tableName} as t0
     |using (
     |  select 1 as id, 'hudi' as name, 97 as price, 99 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
     |  select 1 as id, 'hudi_2' as name, 98 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
     |  select 1 as id, 'hudi_2' as name, 99 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
     |  select 3 as id, 'hudi' as name, 10 as price, 110 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
     | ) as s0
     |on t0.id = s0.id
     |when matched and opt_type!='DELETE' then update set *
     |when matched and opt_type='DELETE' then delete
     |when not matched and opt_type!='DELETE' then insert *
     |""".stripMargin)
spark.table(tableName).show()

查看hoodie.properties和在spark.sql(s"show create table ${tableName}").show(false)打印信息里发现表的元数据信息确实没有preCombinedField,示例中虽然有ts字段,但是没有没有显示设置,当然可以直接去掉ts字段,大家可以自行测试。

有预合并

spark.sql(
  s"""
     | create table ${tableName} (
     |  id int,
     |  name string,
     |  price double,
     |  ts long,
     |  dt string
     |) using hudi
     | partitioned by (dt)
     | options (
     |  primaryKey = 'id',
     |  preCombineField = 'ts',
     |  type = 'cow'
     | )
     | location '/tmp/${tableName}'
     |""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(
  s"""
     |merge into ${tableName} as t0
     |using (
     |  select 1 as id, 'hudi' as name, 97 as price, 99 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
     |  select 1 as id, 'hudi_2' as name, 98 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
     |  select 1 as id, 'hudi_2' as name, 99 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
     |  select 3 as id, 'hudi' as name, 10 as price, 110 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
     | ) as s0
     |on t0.id = s0.id
     |when matched and opt_type!='DELETE' then update set *
     |when matched and opt_type='DELETE' then delete
     |when not matched and opt_type!='DELETE' then insert *
     |""".stripMargin)
spark.table(tableName).show()

SQL的唯一的区别是在建表语句中加了配置preCombineField = ‘ts’,同样可以在hoodie.properties和打印信息里查看是否有hoodie.table.precombine.field=ts信息。

SQL与DF结合

先用SQL建表,再用DF写数据。这种情况主要是想建表时不想多一列ts字段,而在预合并时可以添加一列预合并字段进行去重,因为目前的版本SQL没有实现该功能。在SQL建表时如果指定了 preCombineField = ‘ts’,则表结构中必须有ts这个字段。

    val tableName = "test_hudi_table4"
    spark.sql(
      s"""
         | create table ${tableName} (
         |  id int,
         |  name string,
         |  price double,
         |  dt string
         |) using hudi
         | partitioned by (dt)
         | options (
         |  primaryKey = 'id',
         |  type = 'cow'
         | )
         | location '/tmp/${tableName}'
         |""".stripMargin)


    val data = Array((7, "name12", 1.21, 106L, "2021-05-06"), (7, "name2", 2.22, 108L, "2021-05-06"),
      (7, "name3", 3.45, 107L, "2021-05-06")
    )

    val df = spark.createDataFrame(data).toDF("id", "name", "price", "ts", "dt")

    df.show()
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD.key(), "ts"). //指定preCombinedField=ts
      option(RECORDKEY_FIELD.key(), "id").
      option(PARTITIONPATH_FIELD.key(), "dt").
      option(HIVE_STYLE_PARTITIONING.key(), true). //hive 分区路径的格式是否和hive一样,如果true,则:分区字段=
      option("hoodie.table.name", tableName).
      //      option("hoodie.datasource.write.insert.drop.duplicates", true). //不更新
      //      option(OPERATION.key(), "INSERT").
      option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
      save(s"/tmp/${tableName}")

    val read_df = spark.
      read.
      format("hudi").
      load(s"/tmp/${tableName}" + "/*")

    read_df.show()
    spark.table(tableName).show()

上面的程序主要是用SQL先建了表的元数据,然后再用程序指定了PRECOMBINE_FIELD_OPT_KEY=ts,这样就实现了既可以预合并去重,也不用在建表中指定ts字段。但是打印中发现用程序读parquet文件时多了ts列,读表时因为元数据里没有ts列,没有打印出来,实际文件存储的有ts这一列。

上面只是模拟了这一场景,而我们想实现的时下面的

spark.sql(
  s"""
     | create table ${tableName} (
     |  id int,
     |  name string,
     |  price double,
     |  dt string
     |) using hudi
     | partitioned by (dt)
     | options (
     |  primaryKey = 'id',
     |  preCombineField = 'ts',
     |  type = 'cow'
     | )
     | location '/tmp/${tableName}'
     |""".stripMargin)
spark.sql(
  s"""
     |merge into ${tableName} as t0
     |using (
     |  select 1 as id, 'hudi' as name, 97 as price, 99 as ts, '2021-05-05' as dt,'INSERT' as opt_type union
     |  select 1 as id, 'hudi_2' as name, 98 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
     |  select 1 as id, 'hudi_2' as name, 99 as price, 99 as ts, '2021-05-05' as dt,'UPDATE' as opt_type union
     |  select 3 as id, 'hudi' as name, 10 as price, 110 as ts, '2021-05-05' as dt ,'DELETE' as opt_type
     | ) as s0
     |on t0.id = s0.id
     |when matched and opt_type!='DELETE' then update set *
     |when matched and opt_type='DELETE' then delete
     |when not matched and opt_type!='DELETE' then insert *
     |""".stripMargin)   

在建表时指定了preCombineField = ‘ts’,但是表结构中没有ts字段,而且后面的merge sql拼接时添加这一列。目前master分支还不支持这种情况,如果想实现这一情况,可以自己尝试修改源码支持。

代码

示例代码已上传到gitee,由于公司网把github屏蔽了,以后暂时转到gitee上。

源码解读

解读部分源码

程序写hudi时ts的必须性

默认配置时,如果不指定PRECOMBINE_FIELD_OPT_KEY,则会抛出以下异常:

21/07/13 20:04:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.hudi.exception.HoodieException: ts(Part -ts) field not found in record. Acceptable fields were :[id, name, price, tss, dt]
	at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:437)
	at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:233)
	at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$1.apply(HoodieSparkSqlWriter.scala:230)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

对应的源码:
HoodieSparkSqlWriter.scala 230、233行

230	val hoodieAllIncomingRecords = genericRecords.map(gr => {
		val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
		val hoodieRecord = if (shouldCombine) {
233	val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
			.asInstanceOf[Comparable[_]]
			DataSourceUtils.createHoodieRecord(processedRecord,
			orderingVal, keyGenerator.getKey(gr),
			hoodieConfig.getString(PAYLOAD_CLASS_NAME))
		} else {
			DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
		}
		hoodieRecord
		}).toJavaRDD()

getNestedFieldVal

public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound) {
    String[] parts = fieldName.split("\\.");
    ......
    if (returnNullIfNotFound) {
      return null;
434 } else if (valueNode.getSchema().getField(parts[i]) == null) {
      throw new HoodieException(
          fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
437           + valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
    } else {
      throw new HoodieException("The value of " + parts[i] + " can not be null");
    }

233行,如果shouldCombinetrue,则会调用getNestedFieldVal,并将PRECOMBINE_FIELD_OPT_KEY的值作为fieldName参数传给getNestedFieldVal,而在getNestedFieldVal的434行发现当PRECOMBINE_FIELD_OPT_KEY的值null时抛出上面的异常。

可以发现当shouldCombine==true,才会调用getNestedFieldVal,才会抛出该异常,而shouldCombine何时为true呢

val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
  operation.equals(WriteOperationType.UPSERT) ||
  parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
    HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean

当INSERT_DROP_DUPS为true或者操作类型为UPSERT时,shouldCombine为true,默认的INSERT_DROP_DUPS=false


  /**
   * Flag to indicate whether to drop duplicates upon insert.
   * By default insert will accept duplicates, to gain extra performance.
   */
  val INSERT_DROP_DUPS: ConfigProperty[String] = ConfigProperty
    .key("hoodie.datasource.write.insert.drop.duplicates")
    .defaultValue("false")
    .withDocumentation("If set to true, filters out all duplicate records from incoming dataframe, during insert operations.")

也就是默认情况下,upsert操作,ts是必须的,而insert等其他操作可以没有ts值。这样我们就可以根据实际情况灵活运用了。

注意

用SQL创建新表或者DF append模式创建新表时,如果对应的数据目录已存在,需要先将文件夹删掉,因为hoodie.properties里保存了表的元数据信息,程序里会根据文件信息判断表是否存在,如果存在,会复用旧表的元数据。这种情况存在于想用同一个表测试上面多种情况

精彩评论(0)

0 0 举报