- 开始实例
- IcebergStreamWriter
- IcebergFilesCommitter
- 附:flink task执行流程
- 参考
flink支持DataStream和DataStream写入iceberg
StreamExecutionEnvironment env = ...; DataStreaminput = ... ; Configuration hadoopConf = new Configuration(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf); FlinkSink.forRowData(input, Flink_SCHEMA) .tableLoader(tableLoader) .writeParallelism(1) .build(); env.execute("Test Iceberg DataStream");
input为DataStream和DataStream形式的输入流,Flink_SCHEMA为TableSchema;
首先看build()方法:
public DataStreamSinkbuild() { Preconditions.checkArgument(this.rowDataInput != null, "Please use forRowData() to initialize the input DataStream."); Preconditions.checkNotNull(this.tableLoader, "Table loader shouldn't be null"); if (this.table == null) { this.tableLoader.open(); try { TableLoader loader = this.tableLoader; Throwable var2 = null; try { this.table = loader.loadTable(); } catch (Throwable var12) { var2 = var12; throw var12; } finally { if (loader != null) { if (var2 != null) { try { loader.close(); } catch (Throwable var11) { var2.addSuppressed(var11); } } else { loader.close(); } } } } catch (IOException var14) { throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, var14); } } List equalityFieldIds = Lists.newArrayList(); if (this.equalityFieldColumns != null && this.equalityFieldColumns.size() > 0) { Iterator var16 = this.equalityFieldColumns.iterator(); while(var16.hasNext()) { String column = (String)var16.next(); NestedField field = this.table.schema().findField(column); Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", column, this.table.schema()); equalityFieldIds.add(field.fieldId()); } } RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema); this.rowDataInput = this.distributeDataStream(this.rowDataInput, this.table.properties(), this.table.spec(), this.table.schema(), flinkRowType); IcebergStreamWriter streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds); IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite); this.writeParallelism = this.writeParallelism == null ? this.rowDataInput.getParallelism() : this.writeParallelism; DataStream returnStream = this.rowDataInput.transform(FlinkSink.ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter).setParallelism(this.writeParallelism).transform(FlinkSink.ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter).setParallelism(1).setMaxParallelism(1); return returnStream.addSink(new DiscardingSink()).name(String.format("IcebergSink %s", this.table.name())).setParallelism(1); }
此处创建写的iceberg核心算子IcebergStreamWriter和IcebergFilesCommitter
IcebergStreamWriterIcebergStreamWriterstreamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
build()方法中,调用createStreamWriter()创建IcebergStreamWriter
static IcebergStreamWritercreateStreamWriter(Table table, RowType flinkRowType, List equalityFieldIds) { Map props = table.properties(); long targetFileSize = getTargetFileSizeBytes(props); FileFormat fileFormat = getFileFormat(props); TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkRowType, table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props, equalityFieldIds); return new IcebergStreamWriter(table.name(), taskWriterFactory); }
根据表信息构建TaskWriterFactory,并传入到IcebergStreamWriter
class IcebergStreamWriterextends AbstractStreamOperator implements OneInputStreamOperator , BoundedoneInput { private static final long serialVersionUID = 1L; private final String fullTableName; private final TaskWriterFactory taskWriterFactory; private transient TaskWriter writer; private transient int subTaskId; private transient int attemptId; IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; this.setChainingStrategy(ChainingStrategy.ALWAYS); } public void open() { this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask(); this.attemptId = this.getRuntimeContext().getAttemptNumber(); this.taskWriterFactory.initialize(this.subTaskId, this.attemptId); this.writer = this.taskWriterFactory.create(); } public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { this.emit(this.writer.complete()); this.writer = this.taskWriterFactory.create(); } public void processElement(StreamRecord element) throws Exception { this.writer.write(element.getValue()); } }
在open中通过传入的taskWriterFactory构建TaskWriter
public TaskWritercreate() { Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) { return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds)); } else { return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema)); } }
此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)
四个类的调用关系:
指定字段:
UnpartitionedDeltaWriter -> baseEqualityDeltaWriter.write() -> RollingFileWriter.write() -> appender.add()
PartitionedDeltaWriter -> baseDeltaTaskWriter.write() -> RollingFileWriter.write() -> appender.add()
未指定字段:
UnpartitionedWriter -> RollingFileWriter.write() -> appender.add()
RowDataPartitionedFanoutWriter -> baseRollingWriter.write -> RollingFileWriter.write() -> appender.add()
底层调用的appender为创建TaskWriter传入的FlinkAppenderFactory创建的
在processElement()中调用write(element.getValue())方法,将数据写入,最后在checkpoint时提交。
提示:task执行三部曲:beforeInvoke() -> runMailboxLoop() -> afterInvoke()
beforeInvoke调用open()和initializeState(),runMailboxLoop调用processElement()处理数据
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
build()方法中,传入tableLoader和overwrite直接创建IcebergFilesCommitter。
checkpoint初始化操作在IcebergFilesCommitter的initializeState()
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
this.tableLoader.open();
this.table = this.tableLoader.loadTable();
int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
int attemptId = this.getRuntimeContext().getAttemptNumber();
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(this.table, this.flinkJobId, subTaskId, (long)attemptId);
this.maxCommittedCheckpointId = -1L;
this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DEscriptOR);
this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DEscriptOR);
if (context.isRestored()) {
String restoredFlinkJobId = (String)((Iterable)this.jobIdState.get()).iterator().next();
Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(this.table, restoredFlinkJobId);
NavigableMap uncommittedDataFiles = Maps.newTreeMap((SortedMap)((Iterable)this.checkpointsState.get()).iterator().next()).tailMap(this.maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
long maxUncommittedCheckpointId = (Long)uncommittedDataFiles.lastKey();
this.commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
}
}
}
checkpoint提交流程在IcebergFilesCommitter的snapshotState中
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
long checkpointId = context.getCheckpointId();
LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", this.table, checkpointId);
this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
this.checkpointsState.clear();
this.checkpointsState.add(this.dataFilesPerCheckpoint);
this.jobIdState.clear();
this.jobIdState.add(this.flinkJobId);
this.writeResultsOfCurrentCkpt.clear();
}
this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
为更新当前的checkpointId和manifest元文件信息
dataFilesPerCheckpoint与调用关系如下:
private byte[] writeToManifest(long checkpointId) throws IOException {
if (this.writeResultsOfCurrentCkpt.isEmpty()) {
return EMPTY_MANIFEST_DATA;
} else {
WriteResult result = WriteResult.builder().addAll(this.writeResultsOfCurrentCkpt).build();
DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> {
return this.manifestOutputFileFactory.create(checkpointId);
}, this.table.spec());
return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
}
}
writeResultsOfCurrentCkpt中包含了datafile文件、deletefile文件和referenced数据文件。然后,根据result创建deltaManifests ,并且返回序列化后的manifest信息。
deltaManifests 值如下:
static DeltaManifests writeCompletedFiles(WriteResult result, SupplieroutputFileSupplier, PartitionSpec spec) throws IOException { ManifestFile dataManifest = null; ManifestFile deleteManifest = null; if (result.dataFiles() != null && result.dataFiles().length > 0) { dataManifest = writeDataFiles((OutputFile)outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); } if (result.deleteFiles() != null && result.deleteFiles().length > 0) { OutputFile deleteManifestFile = (OutputFile)outputFileSupplier.get(); ManifestWriter deleteManifestWriter = ManifestFiles.writeDeleteManifest(2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); ManifestWriter writer = deleteManifestWriter; Throwable var8 = null; try { DeleteFile[] var9 = result.deleteFiles(); int var10 = var9.length; for(int var11 = 0; var11 < var10; ++var11) { DeleteFile deleteFile = var9[var11]; writer.add(deleteFile); } } catch (Throwable var16) { var8 = var16; throw var16; } finally { if (writer != null) { $closeResource(var8, writer); } } deleteManifest = deleteManifestWriter.toManifestFile(); } return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); }
从上面写入过程可以看出,datafile和deletefile写入后,分别生成各自的Manifest文件,最后创建DeltaManifests返回。
最后通知checkpoint完成,提交checkpoint
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
if (checkpointId > this.maxCommittedCheckpointId) {
this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
}
}
private void commitUpToCheckpoint(NavigableMapdeltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException { NavigableMap pendingMap = deltaManifestsMap.headMap(checkpointId, true); List manifests = Lists.newArrayList(); NavigableMap pendingResults = Maps.newTreeMap(); Iterator var8 = pendingMap.entrySet().iterator(); while(var8.hasNext()) { Entry e = (Entry)var8.next(); if (!Arrays.equals(EMPTY_MANIFEST_DATA, (byte[])e.getValue())) { DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, (byte[])e.getValue()); pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io())); manifests.addAll(deltaManifests.manifests()); } } if (this.replacePartitions) { this.replacePartitions(pendingResults, newFlinkJobId, checkpointId); } else { this.commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId); } pendingMap.clear(); var8 = manifests.iterator(); while(var8.hasNext()) { ManifestFile manifest = (ManifestFile)var8.next(); try { this.table.io().deleteFile(manifest.path()); } catch (Exception var12) { String details = MoreObjects.toStringHelper(this).add("flinkJobId", newFlinkJobId).add("checkpointId", checkpointId).add("manifestPath", manifest.path()).toString(); LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", details, var12); } } }
这里会反序列化之前序列化的值,生成deltaManifests,添加到manifests列表中,manifests值:
然后根据replacePartitions(创建时传入的overwrite值,默认为false)值提交事务,默认情况下调用commitDeltaTxn()
private void commitDeltaTxn(NavigableMappendingResults, String newFlinkJobId, long checkpointId) { int deleteFilesNum = pendingResults.values().stream().mapToInt((r) -> { return r.deleteFiles().length; }).sum(); Stream var10000; if (deleteFilesNum == 0) { AppendFiles appendFiles = this.table.newAppend(); int numFiles = 0; Iterator var8 = pendingResults.values().iterator(); while(var8.hasNext()) { WriteResult result = (WriteResult)var8.next(); Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files."); numFiles += result.dataFiles().length; var10000 = Arrays.stream(result.dataFiles()); Objects.requireNonNull(appendFiles); var10000.forEach(appendFiles::appendFile); } this.commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId); } else { Iterator var12 = pendingResults.entrySet().iterator(); while(var12.hasNext()) { Entry e = (Entry)var12.next(); WriteResult result = (WriteResult)e.getValue(); RowDelta rowDelta = this.table.newRowDelta().validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())).validateDeletedFiles(); int numDataFiles = result.dataFiles().length; var10000 = Arrays.stream(result.dataFiles()); Objects.requireNonNull(rowDelta); var10000.forEach(rowDelta::addRows); int numDeleteFiles = result.deleteFiles().length; var10000 = Arrays.stream(result.deleteFiles()); Objects.requireNonNull(rowDelta); var10000.forEach(rowDelta::addDeletes); this.commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, (Long)e.getKey()); } } }
创建一个RowDelta的对象rowDelta或MergeAppend的appendFiles,rowDelta的实现类为baseRowDelta继承自MergingSnapshotProducer作为一个新的snapshot提交;MergeAppend的实现类MergeAppend,同样继承MergingSnapshotProducer。
private void commitOperation(SnapshotUpdate> operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) {
LOG.info("Committing {} with {} data files and {} delete files to table {}", new Object[]{description, numDataFiles, numDeleteFiles, this.table});
operation.set("flink.max-committed-checkpoint-id", Long.toString(checkpointId));
operation.set("flink.job-id", newFlinkJobId);
long start = System.currentTimeMillis();
operation.commit();
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
}
operation.commit()会调用SnapshotProducer中的commit()方法
public void commit() {
AtomicLong newSnapshotId = new AtomicLong(-1L);
try {
Tasks.foreach(new TableOperations[]{this.ops}).retry(this.base.propertyAsInt("commit.retry.num-retries", 4)).exponentialBackoff((long)this.base.propertyAsInt("commit.retry.min-wait-ms", 100), (long)this.base.propertyAsInt("commit.retry.max-wait-ms", 60000), (long)this.base.propertyAsInt("commit.retry.total-timeout-ms", 1800000), 2.0D).onlyRetryOn(CommitFailedException.class).run((taskOps) -> {
Snapshot newSnapshot = this.apply();
newSnapshotId.set(newSnapshot.snapshotId());
Tablemetadata updated;
if (this.stageOnly) {
updated = this.base.addStagedSnapshot(newSnapshot);
} else {
updated = this.base.replaceCurrentSnapshot(newSnapshot);
}
if (updated != this.base) {
taskOps.commit(this.base, updated.withUUID());
}
});
} catch (RuntimeException var5) {
Exceptions.suppressAndThrow(var5, this::cleanAll);
}
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), this.getClass().getSimpleName());
try {
Snapshot saved = this.ops.refresh().snapshot(newSnapshotId.get());
if (saved != null) {
this.cleanUncommitted(Sets.newHashSet(saved.allManifests()));
Iterator var3 = this.manifestLists.iterator();
while(var3.hasNext()) {
String manifestList = (String)var3.next();
if (!saved.manifestListLocation().equals(manifestList)) {
this.deleteFile(manifestList);
}
}
} else {
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
}
} catch (RuntimeException var6) {
LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", var6);
}
this.notifyListeners();
}
SnapshotProducer.apply() 方法执行写入manifestFiles数据,返回快照数据;
public Snapshot apply() {
this.base = this.refresh();
Long parentSnapshotId = this.base.currentSnapshot() != null ? this.base.currentSnapshot().snapshotId() : null;
long sequenceNumber = this.base.nextSequenceNumber();
this.validate(this.base);
List manifests = this.apply(this.base);
if (this.base.formatVersion() <= 1 && !this.base.propertyAsBoolean("write.manifest-lists.enabled", true)) {
return new baseSnapshot(this.ops.io(), this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifests);
} else {
OutputFile manifestList = this.manifestListPath();
try {
ManifestListWriter writer = ManifestLists.write(this.ops.current().formatVersion(), manifestList, this.snapshotId(), parentSnapshotId, sequenceNumber);
Throwable var7 = null;
try {
this.manifestLists.add(manifestList.location());
ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
Tasks.range(manifestFiles.length).stoponFailure().throwFailureWhenFinished().executeWith(ThreadPools.getWorkerPool()).run((index) -> {
manifestFiles[index] = (ManifestFile)this.manifestsWithmetadata.get((ManifestFile)manifests.get(index));
});
writer.addAll(Arrays.asList(manifestFiles));
} catch (Throwable var13) {
var7 = var13;
throw var13;
} finally {
if (writer != null) {
$closeResource(var7, writer);
}
}
} catch (IOException var15) {
throw new RuntimeIOException(var15, "Failed to write manifest list file", new Object[0]);
}
return new baseSnapshot(this.ops.io(), sequenceNumber, this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifestList.location());
}
}
然后生成表的元数据updated
public Tablemetadata replaceCurrentSnapshot(Snapshot snapshot) {
if (this.snapshotsById.containsKey(snapshot.snapshotId())) {
return this.setCurrentSnapshotTo(snapshot);
} else {
ValidationException.check(this.formatVersion == 1 || snapshot.sequenceNumber() > this.lastSequenceNumber, "Cannot add snapshot with sequence number %s older than last sequence number %s", new Object[]{snapshot.sequenceNumber(), this.lastSequenceNumber});
List newSnapshots = ImmutableList.builder().addAll(this.snapshots).add(snapshot).build();
List newSnapshotLog = ImmutableList.builder().addAll(this.snapshotLog).add(new Tablemetadata.SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())).build();
return new Tablemetadata((InputFile)null, this.formatVersion, this.uuid, this.location, snapshot.sequenceNumber(), snapshot.timestampMillis(), this.lastColumnId, this.schema, this.defaultSpecId, this.specs, this.defaultSortOrderId, this.sortOrders, this.properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, this.addPreviousFile(this.file, this.lastUpdatedMillis));
}
}
调用basemetastoreTableOperations算子的commit()方法
public void commit(Tablemetadata base, Tablemetadata metadata) {
if (base != this.current()) {
throw new CommitFailedException("Cannot commit: stale table metadata", new Object[0]);
} else if (base == metadata) {
LOG.info("Nothing to commit.");
} else {
long start = System.currentTimeMillis();
this.doCommit(base, metadata);
this.deleteRemovedmetadataFiles(base, metadata);
this.requestRefresh();
LOG.info("Successfully committed to table {} in {} ms", this.tableName(), System.currentTimeMillis() - start);
}
}
最后调用HiveTableOperations的doCommit(),执行提交操作。
protected void doCommit(Tablemetadata base, Tablemetadata metadata) {
String newmetadataLocation = this.writeNewmetadata(metadata, this.currentVersion() + 1);
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, this.conf);
boolean threw = true;
boolean updateHiveTable = false;
Optional lockId = Optional.empty();
try {
lockId = Optional.of(this.acquireLock());
Table tbl = this.loadHmsTable();
if (tbl != null) {
if (base == null && tbl.getParameters().get("metadata_location") != null) {
throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
}
updateHiveTable = true;
LOG.debug("Committing existing table: {}", this.fullName);
} else {
tbl = this.newHmsTable();
LOG.debug("Committing new table: {}", this.fullName);
}
tbl.setSd(this.storageDescriptor(metadata, hiveEngineEnabled));
String metadataLocation = (String)tbl.getParameters().get("metadata_location");
String basemetadataLocation = base != null ? base.metadataFileLocation() : null;
if (!Objects.equals(basemetadataLocation, metadataLocation)) {
throw new CommitFailedException("base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", new Object[]{basemetadataLocation, metadataLocation, this.database, this.tableName});
}
this.setParameters(newmetadataLocation, tbl, hiveEngineEnabled);
this.persistTable(tbl, updateHiveTable);
threw = false;
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException var16) {
throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
} catch (UnknownHostException | TException var17) {
if (var17.getMessage() != null && var17.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't exist, this probably happened when using embedded metastore or doesn't create a transactional meta table. To fix this, use an alternative metastore", var17);
}
throw new RuntimeException(String.format("metastore operation failed for %s.%s", this.database, this.tableName), var17);
} catch (InterruptedException var18) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during commit", var18);
} finally {
this.cleanupmetadataAndUnlock(threw, newmetadataLocation, lockId);
}
附:flink task执行流程
task的生命周期:
StreamTask是所有stream task的基本类。一个task 运行一个或者多个StreamOperator(如果成chain)。成chain的算子在同一个线程内同步运行。
执行过程:
@Override
public final void invoke() throws Exception {
try {
beforeInvoke();
// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
afterInvoke();
}
catch (Exception invokeException) {
try {
cleanUpInvoke();
}
catch (Throwable cleanUpException) {
throw (Exception) ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
}
throw invokeException;
}
cleanUpInvoke();
}
在beforeInvoke中会做一些初始化工作,包括提取出所有的operator等。
在runMailboxLoop中调用task运行
在afterInvoke中结束
调用关系:
-- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * --------------> mailboxProcessor.runMailboxLoop(); * --------------> StreamTask.processInput() * --------------> StreamTask.inputProcessor.processInput() * --------------> 间接调用 operator的processElement()和processWatermark()方法 * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup()
- 创建状态存储后端,为 OperatorChain 中的所有算子提供状态
- 加载 OperatorChain 中的所有算子
- 所有的 operator 调用 setup
- task 相关的初始化操作
- 所有 operator 调用 initializeState 初始化状态
- 所有的 operator 调用 open
- run 方法循环处理数据
- 所有 operator 调用 close
- 所有 operator 调用 dispose
- 通用的 cleanup 操作
- task 相关的 cleanup 操作
iceberg相关学习文档
- https://iceberg.apache.org/#flink/
- https://www.dremio.com/apache-iceberg-an-architectural-look-under-the-covers/



