本代码分析基于Flink1.13.1、Hudi0.10.0
-- 生成数据
create table datagen (
id bigint,
prod_id bigint,
price int,
uid bigint,
buy_time timestamp
) with (
'connector' = 'datagen'
);
-- hudi数据表
create table t_hudi (
id bigint,
prod_id bigint,
price int,
uid bigint,
buy_time timestamp
) with (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ'
);
insert into t_hudi select * from datagen;
通过分析Hudi connector,meta-INF/services/org.apache.flink.table.factories.Factory中
(HoodieTableFactory、HoodieTableSink的内容,涉及 Flink dynamic table)
发现Flink Hudi写入时所有流程都在HoodieTableSink中
下面是对HoodieTableSink Pipelines的分析:涉及bootstrap、hoodieStreamWrite、compact
Pipelines.bootstrapFlink流处理调用streamBootstrap
public static DataStreamRowDataToHoodieFunctionsbootstrap( Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream, boolean bounded, boolean overwrite) { final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); if (overwrite) { return rowDataToHoodieRecord(conf, rowType, dataStream); } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) { return boundedBootstrap(conf, rowType, defaultParallelism, dataStream); } else { return streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded); } } private static DataStream streamBootstrap( Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream, boolean bounded) { DataStream dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) { dataStream1 = dataStream1 .transform( "index_bootstrap", TypeInformation.of(HoodieRecord.class), new BootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } return dataStream1; } public static DataStream rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream dataStream) { return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); }
private HoodieRecord toHoodieRecord(I record) throws Exception {
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
final HoodieKey hoodieKey = keyGenerator.getKey(gr);
HoodieRecordPayload payload = payloadCreation.createPayload(gr);
HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toBytevalue());
return new HoodieRecord<>(hoodieKey, payload, operation);
}
这里只展开讲解RowDataToHoodieFunction,它是负责将Flink RowData数据转化成HoodieRecord
1.首先将RowData转成GenericRecord(avro格式)
2.根据规则生成HoodieKey
3.根据RowData的RowKind获取HoodieOperation,是hudi可以处理增删改事件
BootstrapOperator1.加载hudi索引
2.waitForBootstrapReady等待其他subTask准备完成
Pipelines.hoodieStreamWritepublic static DataStreamBucketAssignFunction
指定数据保存的在哪个文件中,解决小文件的问题
WriteOperatorFactoryHudi中最复杂的部分,分为StreamWriteFunction、CoordinatedOperatorFactory
StreamWriteFunction private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, skip.");
return false;
}
List records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
bucket.preWrite(records);
final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
records.clear();
final WritemetadataEvent event = WritemetadataEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
.writeStatus(writeStatus)
.lastBatch(false)
.endInput(false)
.build();
this.eventGateway.sendEventToCoordinator(event);
writeStatuses.addAll(writeStatus);
return true;
}
private void initWriteFunction() {
final String writeOperation = this.config.get(FlinkOptions.OPERATION);
switch (WriteOperationType.fromValue(writeOperation)) {
case INSERT:
this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
break;
case UPSERT:
this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
break;
case INSERT_OVERWRITE:
this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime);
break;
case INSERT_OVERWRITE_TABLE:
this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime);
break;
default:
throw new RuntimeException("Unsupported write operation : " + writeOperation);
}
}
1.获取写入的instantTime Hudi Timeline
2.负责调用HoodieFlinkWriteClient将数据写入HoodieLog(具体写入格式参考HoodieLogFormatWriter)
3.生成WritemetadataEvent发送到OperatorEventGateway
WriteOperatorFactory@Override public> T createStreamOperator(StreamOperatorParameters
1.createStreamOperator用于初始化StreamWriteOperator、StreamWriteFunction
2.getCoordinatorProvider用于创建StreamWriteOperatorCoordinator。
- OperatorCoordinator运行在JobManager上,通过SubtaskGateway和OperatorEventHandler在OperatorCoordinator和算子之间传递数据。StreamWriteOperatorCoordinator用于在Timeline上保存HoodieLog commit记录,为了保证同一时间只有一个线程在执行commit任务。
@Override
public void notifyCheckpointComplete(long checkpointId) {
executor.execute(
() -> {
final boolean committed = commitInstant(this.instant, checkpointId);
if (tableState.scheduleCompaction) {
// if async compaction is on, schedule the compaction
CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
}
}, "commits the instant %s", this.instant
);
}
private boolean commitInstant(String instant, long checkpointId) {
if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
// The last checkpoint finished successfully.
return false;
}
List writeResults = Arrays.stream(eventBuffer)
.filter(Objects::nonNull)
.map(WritemetadataEvent::getWriteStatuses)
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
// Send commit ack event to the write function to unblock the flushing
sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);
return true;
}
private void doCommit(String instant, List writeResults) {
// commit or rollback
long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
HashMap checkpointCommitmetadata = new HashMap<>();
if (hasErrors) {
LOG.warn("Some records failed to merge but forcing commit since commitonErrors set to true. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
final Map> partitionToReplacedFileIds = tableState.isOverwrite
? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults)
: Collections.emptyMap();
boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitmetadata),
tableState.commitAction, partitionToReplacedFileIds);
if (success) {
reset();
LOG.info("Commit instant [{}] success!", instant);
} else {
throw new HoodieException(String.format("Commit instant [%s] failed!", instant));
}
} else {
LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
LOG.error("The first 100 error messages");
writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {
LOG.error("Global error for partition path {} and fileID {}: {}",
ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());
if (ws.getErrors().size() > 0) {
ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));
}
});
// Rolls back instant
writeClient.rollback(instant);
throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));
}
}
Pipelines.compact
CompactionPlanOperator
根据之前写入的HoodieLog生成压缩计划,并拆分操作交给CompactFunction
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getmetaClient(), compactionInstantTime);
CompactFunction
执行HoodieLog压缩,生成Parquet文件
private void doCompaction(String instantTime, CompactionOperation compactionOperation, CollectorCompactionCommitSinkcollector) throws IOException { HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); List writeStatuses = compactor.compact( new HoodieFlinkCopyOnWriteTable<>( writeClient.getConfig(), writeClient.getEngineContext(), writeClient.getHoodieTable().getmetaClient()), writeClient.getHoodieTable().getmetaClient(), writeClient.getConfig(), compactionOperation, instantTime, writeClient.getHoodieTable().getTaskContextSupplier()); collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); }
等待压缩计划的所有操作都完成后,commit合并操作



