总结 HUDI preCombinedField,分两大类总结,一类是Spark SQL,这里指的是merge,因为只有merge语句中有多条记录,讨论preCombinedField才有意义;一类是Spark DF,HUDI0.9版本支持SQL建表和增删改查
总结先说结论:
Spark DF建表写数据时(含更新):
1、UPSERT,当数据重复时(这里指同一主键对应多条记录),程序在写数据前会根据预合并字段ts进行去重,去重保留ts值最大的那条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
2、INSERT时,没有预合并,程序依次写入,实际更新为最后一条记录,且无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
Spark SQL建表,写数据时(含更新):
有ts时,预合并时如果数据重复取预合并字段值最大的那条记录,最大值相同的取第一个。写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。
没有ts时,则默认将主键字段的第一个值作为预合并字段,如果数据重复,去重时会取第一个值,写数据时,直接覆盖历史数据(因为这里的预合并字段为主键字段,等于历史值,其实原理跟上面有ts时一样)
PRECOMBINE_FIELD.key -> targetKey2Sourceexpression.keySet.head, // set a default preCombine field
说明:
1、这里有ts代表设置了preCombinedField字段
2、hudi默认使用布隆索引,布隆索引只保证同一分区下同一个主键对应的值唯一,可以使用全局索引保证所有分区值唯一,这里不展开细说
private String getDefaultIndexType(EngineType engineType) {
switch (engineType) {
case SPARK:
return HoodieIndex.IndexType.BLOOM.name();
case Flink:
case JAVA:
return HoodieIndex.IndexType.INMEMORY.name();
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
3、如果在测试过程中,发现和我的结论不一致,可能和后面的注意事项有关。
4、当指定了hoodie.datasource.write.insert.drop.duplicates=true时,不管是insert还是upsert,如果存在历史数据则不更新。实际在源码中,如果为upsert,也会修改为insert。
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
operation == WriteOperationType.UPSERT) {
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS is set to be true, " +
s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
operation = WriteOperationType.INSERT
}
Spark DF
先说DF建表,DF写hudi表时,默认情况下,hudi,必须指定preCombinedField,否则,会抛出异常(当为insert等其他类型时,preCombinedField可以不用设置,具体见后面的源码解读部分),示例如下
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, getQuickstartWriteConfigs}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[*]")
.appName("TestHuDiPreCombinedFiled")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate()
val tableName = "test_hudi_table"
val data = Array((7, "name12", 1.21, 108L, "2021-05-06"), (7, "name2", 2.22, 108L, "2021-05-06"),
(7, "name3", 3.45, 108L, "2021-05-06")
)
val df = spark.createDataframe(data).toDF("id", "name", "price", "ts", "dt")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD.key(), "ts"). //指定preCombinedField=ts
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "dt").
option(HIVE_STYLE_PARTITIONING.key(), true). //hive 分区路径的格式是否和hive一样,如果true,则:分区字段=
option("hoodie.table.name", tableName).
// option("hoodie.datasource.write.insert.drop.duplicates", true). //不更新
// option(OPERATION.key(), "INSERT").
option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator").
mode("append").
save(s"/tmp/${tableName}")
val read_df = spark.
read.
format("hudi").
load(s"/tmp/${tableName}" + "
val INSERT_DROP_DUPS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.insert.drop.duplicates")
.defaultValue("false")
.withdocumentation("If set to true, filters out all duplicate records from incoming dataframe, during insert operations.")
也就是默认情况下,upsert操作,ts是必须的,而insert等其他操作可以没有ts值。这样我们就可以根据实际情况灵活运用了。
注意用SQL创建新表或者DF append模式创建新表时,如果对应的数据目录已存在,需要先将文件夹删掉,因为hoodie.properties里保存了表的元数据信息,程序里会根据文件信息判断表是否存在,如果存在,会复用旧表的元数据。这种情况存在于想用同一个表测试上面多种情况



