理解StreamGraph的前提之一就是要对DAG有向无环图有所了解。需要知道什么是DAG的顶点和边,以及怎么通过顶点和边构建DAG。
前提之二:TransformationFlink流处理中的各个算子操作会转化成一系列的Transformation对象进行存储,StreamGraph就是通过一系列的Transformation对象进行构建。
Transformation类型- OneInputTransformation
只有一个输入的一类转化操作,如map、filter、process等算子 - TwoInputTransformation
TwoInputTransformation具有两个输入。ConnectedStream的算子为双流运算,它的算子会被转换为TwoInputTransformation。 - SourceTransformation
在env.addSource()的时候会创建一个DataStreamSource。DataStreamSource的构造函数中会创建一个SourceTransformation。 - SinkTransformation
和SourceTransformation类似,在dataStream调用addSink方法的时候会生成一个DataStreamSink对象。该对象在创建的时候会同时构造一个SinkTransformation。 - UnionTransformation
DataStream在进行union的时候会创建一个UnionTransformation
public final DataStreamunion(DataStream ... streams) { List > unionedTransforms = new ArrayList<>(); unionedTransforms.add(this.transformation); for (DataStream newStream : streams) { if (!getType().equals(newStream.getType())) { throw new IllegalArgumentException( "Cannot union streams of different types: " + getType() + " and " + newStream.getType()); } unionedTransforms.add(newStream.getTransformation()); } return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms)); }
- FeedbackTransformation
DataStream在使用iterate()算子时创建IterativeStream的同时会创建FeedbackTransformation
protected IterativeStream(DataStreamdataStream, long maxWaitTime) { super( dataStream.getExecutionEnvironment(), new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime)); this.originalInput = dataStream; this.maxWaitTime = maxWaitTime; setBufferTimeout(dataStream.environment.getBufferTimeout()); }
- CoFeedbackTransformation
构建ConnectedIterativeStreams时会创建CoFeedbackTransformation
public static class ConnectedIterativeStreams extends ConnectedStreams {
private CoFeedbackTransformation coFeedbackTransformation;
public ConnectedIterativeStreams(
DataStream input, TypeInformation feedbackType, long waitTime) {
super(
input.getExecutionEnvironment(),
input,
new DataStream<>(
input.getExecutionEnvironment(),
new CoFeedbackTransformation<>(
input.getParallelism(), feedbackType, waitTime)));
this.coFeedbackTransformation =
(CoFeedbackTransformation) getSecondInput().getTransformation();
}
- PartitionTransformation
进行shuffle、forward、rebalance、keyBy等控制数据流向的算子都属于PartitionTransformation
@PublicEvolving
public DataStream shuffle() {
return setConnectionType(new ShufflePartitioner());
}
protected DataStreamsetConnectionType(StreamPartitioner partitioner) { return new DataStream<>( this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner)); }
- SideOutputTransformation
SideOutputTransformation在进行旁路输出时创建
我们从StreamExecutionEnvironment的execute方法开始分析
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
final StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
return execute(streamGraph);
}
execute中调用了getStreamGraph()方法,getStreamGraph生成了一个StreamGraph实例对象,然后给任务设置Name继续提交执行任务,可见生成StreamGraph的逻辑都在getStreamGraph()方法中,我们接着往下看
getStreamGraph方法
public StreamGraph getStreamGraph() {
return getStreamGraph(true);
}
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
getStreamGraphGenerator(transformations).generate();这行代码意思是通过transformations列表构建StreamGraphGenerator,然后通过generate生成StreamGraph,那么问题来了,transformations集合是什么时候被填充的呢?追踪代码我们发现在StreamExecutionEnvironment的addOperator方法中transformations被填充
public void addOperator(Transformation> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
addOperator在进行各种算子操作时都会调用,如addSink
public DataStreamSinkaddSink(SinkFunction sinkFunction) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig()); } StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }
然后我们回过来看生成StreamGraph的generate方法
public StreamGraph generate() {
// 进行各种配置
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
streamGraph.setEnableCheckpointsAfterTasksFinish(
configuration.get(
ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
shouldExecuteInBatchMode = shouldExecuteInBatchMode();
configureStreamGraph(streamGraph);
// 缓存转换过的transformation
alreadyTransformed = new HashMap<>();
for (Transformation> transformation : transformations) {
transform(transformation);
}
streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
setFineGrainedGlobalStreamExchangeMode(streamGraph);
for (StreamNode node : streamGraph.getStreamNodes()) {
if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
for (StreamEdge edge : node.getInEdges()) {
edge.setSupportsUnalignedCheckpoints(false);
}
}
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
继续追踪一下transform方法
private Collectiontransform(Transformation> transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from the ExecutionConfig. int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // Transformation资源组配置 transform .getSlotSharingGroup() .ifPresent( slotSharingGroup -> { final ResourceSpec resourceSpec = SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup); if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) { slotSharingGroupResources.compute( slotSharingGroup.getName(), (name, profile) -> { if (profile == null) { return ResourceProfile.fromResourceSpec( resourceSpec, MemorySize.ZERO); } else if (!ResourceProfile.fromResourceSpec( resourceSpec, MemorySize.ZERO) .equals(profile)) { throw new IllegalArgumentException( "The slot sharing group " + slotSharingGroup.getName() + " has been configured with two different resource spec."); } else { return profile; } }); } }); // 检查输出类型 // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); @SuppressWarnings("unchecked") // 获取对应Transformation的TransformationTranslator,通过不同translator进行转换 final TransformationTranslator, Transformation>> translator = (TransformationTranslator, Transformation>>) translatorMap.get(transform.getClass()); Collection transformedIds; if (translator != null) { transformedIds = translate(translator, transform); } else { transformedIds = legacyTransform(transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } return transformedIds; }
然后时translate方法
private Collectiontranslate( final TransformationTranslator, Transformation>> translator, final Transformation> transform) { checkNotNull(translator); checkNotNull(transform); final List > allInputIds = getParentInputIds(transform.getInputs()); // the recursive call might have already transformed this if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } // 确定Transformation的资源组,具体逻辑如下: // 1.如果设置了资源组,则使用设置的资源组, // 2.如果没有设置,采用上游输入的资源组配置 // 3.如果多个输入节点的资源组配置不同,则采用默认的资源组配置 final String slotSharingGroup = determineSlotSharingGroup( transform.getSlotSharingGroup().isPresent() ? transform.getSlotSharingGroup().get().getName() : null, allInputIds.stream() .flatMap(Collection::stream) .collect(Collectors.toList())); final TransformationTranslator.Context context = new ContextImpl(this, streamGraph, slotSharingGroup, configuration); return shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context); }
translate中有个比较重要的资源组配置逻辑:
- 如果设置了资源组,则使用设置的资源组,
- 如果没有设置,采用上游输入的资源组配置
- 如果多个输入节点的资源组配置不同,则采用默认的资源组配置
然后就是构造TransformationTranslator上下文,选择流处理或批处理模式去执行,
比如选择流处理模式:
@Override
public final Collection translateForStreaming(
final T transformation, final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final Collection transformedIds =
translateForStreamingInternal(transformation, context);
configure(transformation, context);
return transformedIds;
}
translateForStreamingInternal方法是个抽象方法,不同的Transformation类型交由不同的实现去处理。
我们从SourceTransformationTranslator看一下具体的实现:
@Override
protected Collection translateForStreamingInternal(
final SourceTransformation transformation,
final Context context) {
return translateInternal(transformation, context, true );
}
private CollectiontranslateInternal( final SourceTransformation transformation, final Context context, boolean emitProgressiveWatermarks) { checkNotNull(transformation); checkNotNull(context); final StreamGraph streamGraph = context.getStreamGraph(); final String slotSharingGroup = context.getSlotSharingGroup(); final int transformationId = transformation.getId(); final ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); // 新建SourceOperator工厂类 SourceOperatorFactory operatorFactory = new SourceOperatorFactory<>( transformation.getSource(), transformation.getWatermarkStrategy(), emitProgressiveWatermarks); // 设置算子链策略 operatorFactory.setChainingStrategy(transformation.getChainingStrategy()); // add source到streamGraph streamGraph.addSource( transformationId, slotSharingGroup, transformation.getCoLocationGroupKey(), operatorFactory, null, transformation.getOutputType(), "Source: " + transformation.getName()); // 设置并行度 final int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? transformation.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(transformationId, parallelism); streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism()); return Collections.singleton(transformationId); }
接着追踪addSource方法
publicvoid addSource( Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, SourceOperatorFactory operatorFactory, TypeInformation inTypeInfo, TypeInformation outTypeInfo, String operatorName) { addOperator( vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName, SourceOperatorStreamTask.class); sources.add(vertexID); }
addSource包含了一个addOperator方法,然后把节点ID添加到StreamGraph的sources属性中
privatevoid addOperator( Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory operatorFactory, TypeInformation inTypeInfo, TypeInformation outTypeInfo, String operatorName, Class extends AbstractInvokable> invokableClass) { addNode( vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName); setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo)); if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) { // sets the output type which must be know at StreamGraph creation time operatorFactory.setOutputType(outTypeInfo, executionConfig); } if (operatorFactory.isInputTypeConfigurable()) { operatorFactory.setInputType(inTypeInfo, executionConfig); } if (LOG.isDebugEnabled()) { LOG.debug("Vertex: {}", vertexID); } }
addOperator方法中调用了addNode方法
protected StreamNode addNode(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class extends AbstractInvokable> vertexClass,
StreamOperatorFactory> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
StreamNode vertex =
new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
vertexClass);
streamNodes.put(vertexID, vertex);
return vertex;
}
可以看到,最终把节点信息封装到了StreamNode当中,然后放入streamNodes集合当中。这个source节点,没有涉及到edge的添加逻辑。我们粗略看一下OneInputTransformation添加到StreamGraph的逻辑:
protected CollectiontranslateInternal( final Transformation transformation, final StreamOperatorFactory operatorFactory, final TypeInformation inputType, @Nullable final KeySelector stateKeySelector, @Nullable final TypeInformation> stateKeyType, final Context context) { checkNotNull(transformation); checkNotNull(operatorFactory); checkNotNull(inputType); checkNotNull(context); final StreamGraph streamGraph = context.getStreamGraph(); final String slotSharingGroup = context.getSlotSharingGroup(); final int transformationId = transformation.getId(); final ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); streamGraph.addOperator( transformationId, slotSharingGroup, transformation.getCoLocationGroupKey(), operatorFactory, inputType, transformation.getOutputType(), transformation.getName()); if (stateKeySelector != null) { TypeSerializer> keySerializer = stateKeyType.createSerializer(executionConfig); streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer); } int parallelism = transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? transformation.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(transformationId, parallelism); streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism()); final List > parentTransformations = transformation.getInputs(); checkState( parentTransformations.size() == 1, "Expected exactly one input transformation but found " + parentTransformations.size()); for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) { streamGraph.addEdge(inputId, transformationId, 0); } return Collections.singleton(transformationId); } }
可以看到,AbstractOneInputTransformationTranslator.translateInternal方法中调用了streamGraph.addEdge方法
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
null,
new ArrayList(),
null,
null);
}
private void addEdgeInternal(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner> partitioner,
List outputNames,
OutputTag outputTag,
StreamExchangeMode exchangeMode) {
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
null,
outputTag,
exchangeMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
exchangeMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
outputNames,
outputTag,
exchangeMode);
} else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// 如果没有指定分区器,并且上下游操作符的并行度一致则使用forward方式分发数据,否则使用rebalance方式分发数据
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
if (partitioner == null
&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner
添加edge时涉及到数据的上下游分发方式,oneInputTransformation addEdge这里显示了一个重要的逻辑:如果没有指定分区器,并且上下游操作符的并行度一致则使用forward方式分发数据,否则使用rebalance方式分发数据
同时这里涉及到了虚拟节点,什么是虚拟节点呢,我们发现sideOutput,select和分区操作不需要用户传入自定义的处理逻辑,这些类型的变换会被处理成虚拟节点。虚拟节点严格来说不是StreamNode类型,不包含物理转换逻辑。虚拟节点的不会出现在StreamGraph的处理流中,在添加edge的时候如果上有节点为虚拟节点,会通过递归的方式寻找上游节点,直至找到一个非虚拟节点,再执行添加edge逻辑。虚拟节点通过内部的originalId属性,附着于非虚拟节点上。
文章参考:https://www.jianshu.com/p/138370b6b820



