栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink作业提交源码解析(2) - StreamGraph的生成

flink作业提交源码解析(2) - StreamGraph的生成

flink作业提交源码解析(2) - StreamGraph的生成

原文链接
版本

flink: release-1.14
os: ubuntu 16.04
IDE: IDEA
WordCount源码及执行流程概览

上文flink作业提交源码解析(1)中说道fink-client中的方法callMainMethod使用了反射机制,去运行用户代码的入口类。本文就进一步研究用户的代码执行的逻辑。

使用自带的WordCount.jar作为例子。运行命令如下:

bin/flink run -t remote -d ./examples/streaming/WordCount.jar

wordCount的代码如下:

 public static void main(String[] args) throws Exception {

        // Checking input parameters
        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream text = null;
        if (params.has("input")) {
            // union all the inputs from text files
            for (String input : params.getMultiParameterRequired("input")) {
                if (text == null) {
                    text = env.readTextFile(input);
                } else {
                    text = text.union(env.readTextFile(input));
                }
            }
            Preconditions.checkNotNull(text, "Input DataStream should not be null.");
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }

        DataStream> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(value -> value.f0)
                        .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
        // execute program
        env.execute("Streaming WordCount");
    }

上面代码中,当执行env.execute("Streaming WordCount"),会调用具体的ExecutionEnvironment去提交作业,这里是StreamExecutionEnvironment,如下面代码所示,逻辑便是生成streamGraph,然后执行。

//StreamExecutionEnvironment 
public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        final StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        return execute(streamGraph);
    }
获取和生成StreamExecutionEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

获取一个执行环境ExecutionEnvironment

//StreamExecutionEnvironment

public static StreamExecutionEnvironment getExecutionEnvironment() {
    return getExecutionEnvironment(new Configuration());
}

通过一个StreamExecutionEnvironmentFactory去获取,这里的contextEnvironmentFactory是client在执行用户代码前初始化好的。

//StreamExecutionEnvironment

public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
    return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
            .map(factory -> factory.createExecutionEnvironment(configuration))
            .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ps4y2G1D-1644398935142)(…/…/images/flink/stream-graph-jar/stream-1.png)]

当wordCount代码执行到env.execute("Streaming WordCount")时,可以看到text这个DataStream,包含了LegacySourceTransformation

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lC5yTNU3-1644398935148)(…/…/images/flink/stream-graph-jar/stream-2.png)]

counts这个则包含一个ReduceTransformation

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3lgTAnsv-1644398935153)(…/…/images/flink/stream-graph-jar/stream-3.png)]

获取StreamGraph的逻辑在final StreamGraph streamGraph = getStreamGraph();这行代码的getStreamGraph()方法中,一步步跟进去

//StreamExecutionEnvironment
@Internal
public StreamGraph getStreamGraph() {
	return getStreamGraph(true);
}
//这里执行完生成StreamGraph之后,会清空StreamExecutionEnvironment的Transformations列表
@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
	final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
	if (clearTransformations) {
		transformations.clear();
	}
	return streamGraph;
}

private StreamGraphGenerator getStreamGraphGenerator(List> transformations) {
    if (transformations.size() <= 0) {
        throw new IllegalStateException(
                "No operators defined in streaming topology. Cannot execute.");
    }
    return new StreamGraphGenerator(transformations, config, checkpointCfg, configuration)
            .setStateBackend(defaultStateBackend)
            .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
            .setSavepointDir(defaultSavepointDirectory)
            .setChaining(isChainingEnabled)
            .setUserArtifacts(cacheFile)
            .setTimeCharacteristic(timeCharacteristic)
            .setDefaultBufferTimeout(bufferTimeout)
            .setSlotSharingGroupResource(slotSharingGroupResources);
}

最终会使用StreamGraphGenerator这个生成器来生成StreamGraph, 打印需要转换的transformations,该作业包含了3个Transformation,

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sfMS19rq-1644398935155)(…/…/images/flink/stream-graph-jar/stream-4.png)]

StreamGraph生成流程总览

首先,在env中生成一颗Transformations树,存储在List> transformatinos中,如下图所示,包含了3个Transformation其次,遍历transformatinos

OneInputTransformation:

    获取到input为LegacySourceTransformation,生成了Source:Collection Source这个StreamNode处理OneInputTransformation,生成Flat Map这个StreamNode添加StreamEdge(Source: Collection Source-1_Flat Map-2_0_FORWARD_0)连接上游Source:Collection Source和Flat Map,由于上下游并行度一致且没有指定分区方式,所以这里分区方式是FORWARD
RedudeTransformation:
    获取到input为PartitionTransformation,该Transformation不会生成StreamNode,只是生成一个虚拟的分区节点,记录在StreamGraph对象的virtualPartitionNodes属性中处理RedudeTransformation,生成Keyed Aggregation这个StreamNode添加streamEdge连接上游和自身,发现上游是虚拟分区节点,从virtualPartitionNodes获取到上游的StreamNode即Flat Map,生成StreamEdge(Flat Map-2_Keyed Aggregation-4_0_HASH_0)连接Flat Map和Keyed Aggregation,这里指定了分区方式为HASH
LegacySinkTransformation:
    获取到input为ReduceTransformation,该节点已生成处理LegacySinkTransformation,生成Sink: Print to Std. Out这个StreamNode添加StreamEdge(Keyed Aggregation-4_Sink: Print to Std. Out-5_0_FORWARD_0)连接上游Keyed Aggregation和Sink: Print to Std. Out,由于上下游并行度一致且没有指定分区方式,所以这里分区方式是FORWARD

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iAuefQxF-1644398935156)(…/…/images/flink/stream-graph-jar/stream-graph.png)]

最终的streamGraph为:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Gc5KeW6F-1644398935158)(…/…/images/flink/stream-graph-jar/stream-graph1.png)]

StreamGraph生成的源码跟踪 主流程

主要包含几个步骤:

    初始化并配置streamGraph的信息遍历所有的Transformation,并对transformation进行转换
//StreamGraphGenerator
public StreamGraph generate() {
    	//1. 初始化并配置streamGraph的信息
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        streamGraph.setEnableCheckpointsAfterTasksFinish(
                configuration.get(
                        ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
        shouldExecuteInBatchMode = shouldExecuteInBatchMode();
        configureStreamGraph(streamGraph);
		//用户保存已经转换的Transformation
        alreadyTransformed = new HashMap<>();
		//2. 对transformation进行转换
        for (Transformation transformation : transformations) {
            transform(transformation);
        }
		
        streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);

        setFineGrainedGlobalStreamExchangeMode(streamGraph);

        for (StreamNode node : streamGraph.getStreamNodes()) {
            if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
                for (StreamEdge edge : node.getInEdges()) {
                    edge.setSupportsUnalignedCheckpoints(false);
                }
            }
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

核心代码在transform(transformation)。

//StreamGraphGenerator
private Collection transform(Transformation transform) {
	//1. 如果某个transformation已经转换过,直接返回transformedId,这里要判断,是因为graph可能会出现loop
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }

    LOG.debug("Transforming " + transform);
    //2. 设置并行度
    if (transform.getMaxParallelism() <= 0) {

        // if the max parallelism hasn't been set, then first use the job wide max parallelism
        // from the ExecutionConfig.
        int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
        if (globalMaxParallelismFromConfig > 0) {
            transform.setMaxParallelism(globalMaxParallelismFromConfig);
        }
    }
	//3. 设置slot共享组
    transform
            .getSlotSharingGroup()
            .ifPresent(
                    slotSharingGroup -> {
                        final ResourceSpec resourceSpec =
                                SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
                        if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
                            slotSharingGroupResources.compute(
                                    slotSharingGroup.getName(),
                                    (name, profile) -> {
                                        if (profile == null) {
                                            return ResourceProfile.fromResourceSpec(
                                                    resourceSpec, MemorySize.ZERO);
                                        } else if (!ResourceProfile.fromResourceSpec(
                                                        resourceSpec, MemorySize.ZERO)
                                                .equals(profile)) {
                                            throw new IllegalArgumentException(
                                                    "The slot sharing group "
                                                            + slotSharingGroup.getName()
                                                            + " has been configured with two different resource spec.");
                                        } else {
                                            return profile;
                                        }
                                    });
                        }
                    });
	
    //4. 调用判断是否有推断出outputType,有则抛出异常
    transform.getOutputType();

    //5. 调用具体的translator
    @SuppressWarnings("unchecked")
    final TransformationTranslator> translator =
            (TransformationTranslator>)
                    translatorMap.get(transform.getClass());

    Collection transformedIds;
    if (translator != null) {
        transformedIds = translate(translator, transform);
    } else {
        transformedIds = legacyTransform(transform);
    }

    // need this check because the iterate transformation adds itself before
    // transforming the feedback edges
    if (!alreadyTransformed.containsKey(transform)) {
        alreadyTransformed.put(transform, transformedIds);
    }

    return transformedIds;
}

步骤如下:

    如果某个transformation已经转换过,直接返回transformedId,这里要判断,是因为graph可能会出现loop设置并行度设置SlotSharingGroup调用判断是否有推断出outputType,有则抛出异常调用具体的translator,内置的translator都被保留在translatorMap中,具体如下所示
//StreamGraphGenerator
static {
    @SuppressWarnings("rawtypes")
    Map, TransformationTranslator>
            tmp = new HashMap<>();
    tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
    tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
    tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
    tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
    tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
    tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
    tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
    tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
    tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
    tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
    tmp.put(
            TimestampsAndWatermarksTransformation.class,
            new TimestampsAndWatermarksTransformationTranslator<>());
    tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
    tmp.put(
            KeyedBroadcastStateTransformation.class,
            new KeyedBroadcastStateTransformationTranslator<>());
    translatorMap = Collections.unmodifiableMap(tmp);
}
    将转换过的transformation添加到alreadyTransformed中

调用具体的translator翻译的代码逻辑如下所示

//StreamGraphGenerator
private Collection translate(
        final TransformationTranslator> translator,
        final Transformation transform) {
    checkNotNull(translator);
    checkNotNull(transform);
	
    final List> allInputIds = getParentInputIds(transform.getInputs());

    // the recursive call might have already transformed this
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }

    final String slotSharingGroup =
            determineSlotSharingGroup(
                    transform.getSlotSharingGroup().isPresent()
                            ? transform.getSlotSharingGroup().get().getName()
                            : null,
                    allInputIds.stream()
                            .flatMap(Collection::stream)
                            .collect(Collectors.toList()));

    final TransformationTranslator.Context context =
            new ContextImpl(this, streamGraph, slotSharingGroup, configuration);

    return shouldExecuteInBatchMode
            ? translator.translateForBatch(transform, context)
            : translator.translateForStreaming(transform, context);
}

下面我们针对具体的Transformation来进行理解。

OneInputTransformation的转换

在进入translate(translator, transform)时, 首先看看OneInputTransformation的属性。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-W0jTMjSN-1644398935158)(…/…/images/flink/stream-graph-jar/stream-5.png)]

方法getParentInputIds(transform.getInputs())中,可以看到先对OneInputTransformation的输入进行转换,而它的输入为LegacySourceTransformation

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GLGf6TW0-1644398935160)(…/…/images/flink/stream-graph-jar/stream-6.png)]

设置上游input transformation——LegacySourceTransformation的转换

同样的,先对其input进行转换,由于LegacySourceTransformation没有input,所以对自身进行translate

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9A3HhJco-1644398935161)(…/…/images/flink/stream-graph-jar/stream-7.png)]

具体根据StreamGraphGenerator中的shouldExecuteInBatchMode去决定转成流还是批模式的,这里是流模式

return shouldExecuteInBatchMode
        ? translator.translateForBatch(transform, context)
        : translator.translateForStreaming(transform, context);
//SimpleTransformationTranslator
@Override
public final Collection translateForStreaming(
        final T transformation, final Context context) {
    checkNotNull(transformation);
    checkNotNull(context);
	
    final Collection transformedIds =
            translateForStreamingInternal(transformation, context);
    configure(transformation, context);

    return transformedIds;
}

这里调用了flink-stream-java模块里的LegacySourceTransformationTranslator

//LegacySourceTransformationTranslator

private Collection translateInternal(
        final LegacySourceTransformation transformation, final Context context) {
    checkNotNull(transformation);
    checkNotNull(context);

    final StreamGraph streamGraph = context.getStreamGraph();
    final String slotSharingGroup = context.getSlotSharingGroup();
    final int transformationId = transformation.getId();
    final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();

    //1. 添加source算子
    streamGraph.addLegacySource(
            transformationId,
            slotSharingGroup,
            transformation.getCoLocationGroupKey(),
            transformation.getOperatorFactory(),
            null,
            transformation.getOutputType(),
            "Source: " + transformation.getName());

    if (transformation.getOperatorFactory() instanceof InputFormatOperatorFactory) {
        streamGraph.setInputFormat(
                transformationId,
                ((InputFormatOperatorFactory) transformation.getOperatorFactory())
                        .getInputFormat());
    }
	//设置并行度
    final int parallelism =
            transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                    ? transformation.getParallelism()
                    : executionConfig.getParallelism();
    streamGraph.setParallelism(transformationId, parallelism);
    streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());

    return Collections.singleton(transformationId);
}
添加source算子
//StreamGraph
public  void addLegacySource(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        StreamOperatorFactory operatorFactory,
        TypeInformation inTypeInfo,
        TypeInformation outTypeInfo,
        String operatorName) {
    addOperator(
            vertexID,
            slotSharingGroup,
            coLocationGroup,
            operatorFactory,
            inTypeInfo,
            outTypeInfo,
            operatorName);
    sources.add(vertexID);
}

public  void addOperator(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperatorFactory operatorFactory,
            TypeInformation inTypeInfo,
            TypeInformation outTypeInfo,
            String operatorName) {
        Class invokableClass =
                operatorFactory.isStreamSource()
                        ? SourceStreamTask.class
                        : OneInputStreamTask.class;
        addOperator(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                operatorFactory,
                inTypeInfo,
                outTypeInfo,
                operatorName,
                invokableClass);
    }

private  void addOperator(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperatorFactory operatorFactory,
            TypeInformation inTypeInfo,
            TypeInformation outTypeInfo,
            String operatorName,
            Class invokableClass) {

    	//添加StreamNode,生成streamNode并添加进Map里
        addNode(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                invokableClass,
                operatorFactory,
                operatorName);
    	//设置该transformation输入和输出的序列化方式
        setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));
		//设置outputType
        if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
            // sets the output type which must be know at StreamGraph creation time
            operatorFactory.setOutputType(outTypeInfo, executionConfig);
        }
		//设置inputType
        if (operatorFactory.isInputTypeConfigurable()) {
            operatorFactory.setInputType(inTypeInfo, executionConfig);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", vertexID);
        }
    }

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RPMTDYar-1644398935162)(…/…/images/flink/stream-graph-jar/stream-8.png)]

执行完addOperator()后:

设置完并行度后

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-F9k75Fvt-1644398935162)(…/…/images/flink/stream-graph-jar/stream-11.png)]

可以看到设置parallelism=1, maxParallelism=-1

回到SimpleTransformationTranslator.translateForStreaming(),下一步是configure(transformation, context);

设置uid,用户提供的节点hash函数,资源等。
private void configure(final T transformation, final Context context) {
    final StreamGraph streamGraph = context.getStreamGraph();
    final int transformationId = transformation.getId();

    StreamGraphUtils.configureBufferTimeout(
            streamGraph, transformationId, transformation, context.getDefaultBufferTimeout());
	//设置算子uid
    if (transformation.getUid() != null) {
        streamGraph.setTransformationUID(transformationId, transformation.getUid());
    }
    if (transformation.getUserProvidedNodeHash() != null) {
        streamGraph.setTransformationUserHash(
                transformationId, transformation.getUserProvidedNodeHash());
    }

    StreamGraphUtils.validateTransformationUid(streamGraph, transformation);
	//设置资源和验证
    if (transformation.getMinResources() != null
            && transformation.getPreferredResources() != null) {
        streamGraph.setResources(
                transformationId,
                transformation.getMinResources(),
                transformation.getPreferredResources());
    }

    final StreamNode streamNode = streamGraph.getStreamNode(transformationId);
    if (streamNode != null) {
        validateUseCaseWeightsNotConflict(
                streamNode.getManagedMemoryOperatorScopeUseCaseWeights(),
                transformation.getManagedMemoryOperatorScopeUseCaseWeights());
        streamNode.setManagedMemoryUseCaseWeights(
                transformation.getManagedMemoryOperatorScopeUseCaseWeights(),
                transformation.getManagedMemorySlotScopeUseCases());
    }
}

至此完成LegacySourceTransformation的转换,回到OneInputTransformation的转换

protected Collection translateInternal(
        final Transformation transformation,
        final StreamOperatorFactory operatorFactory,
        final TypeInformation inputType,
        @Nullable final KeySelector stateKeySelector,
        @Nullable final TypeInformation stateKeyType,
        final Context context) {
    checkNotNull(transformation);
    checkNotNull(operatorFactory);
    checkNotNull(inputType);
    checkNotNull(context);

    final StreamGraph streamGraph = context.getStreamGraph();
    final String slotSharingGroup = context.getSlotSharingGroup();
    final int transformationId = transformation.getId();
    final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
	//往streamGraph中添加算子
    streamGraph.addOperator(
            transformationId,
            slotSharingGroup,
            transformation.getCoLocationGroupKey(),
            operatorFactory,
            inputType,
            transformation.getOutputType(),
            transformation.getName());
	//设置该算子的状态key
    if (stateKeySelector != null) {
        TypeSerializer keySerializer = stateKeyType.createSerializer(executionConfig);
        streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
    }
	//设置并行度
    int parallelism =
            transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                    ? transformation.getParallelism()
                    : executionConfig.getParallelism();
    streamGraph.setParallelism(transformationId, parallelism);
    streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
	//找到父transformation
    final List> parentTransformations = transformation.getInputs();
    checkState(
            parentTransformations.size() == 1,
            "Expected exactly one input transformation but found "
                    + parentTransformations.size());
	//添加streamEdge
    for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
        streamGraph.addEdge(inputId, transformationId, 0);
    }

    return Collections.singleton(transformationId);
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ctZDOmDg-1644398935163)(…/…/images/flink/stream-graph-jar/stream-12.png)]

和LegacySourceTransformation类似,经过

添加StreamNode设置该算子的状态key设置并行度设置uid、资源等

streamGraph中的streamNode为

添加streamEdge
//StreamGraph
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
    addEdgeInternal(
            upStreamVertexID,
            downStreamVertexID,
            typeNumber,
            null,
            new ArrayList(),
            null,
            null);
}

private void addEdgeInternal(
            Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner partitioner,
            List outputNames,
            OutputTag outputTag,
            StreamExchangeMode exchangeMode) {

        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    null,
                    outputTag,
                    exchangeMode);
        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            exchangeMode = virtualPartitionNodes.get(virtualId).f2;
            addEdgeInternal(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    outputNames,
                    outputTag,
                    exchangeMode);
        } else {
            //添加实际的Edge
            createActualEdge(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    outputTag,
                    exchangeMode);
        }
    }

private void createActualEdge(
            Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner partitioner,
            OutputTag outputTag,
            StreamExchangeMode exchangeMode) {
        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);

        //如果没有指定分区器和上下行并行度, 操作符匹配使用forward分区策略,否则使用rebalance。
        if (partitioner == null
                && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            partitioner = new ForwardPartitioner();
        } else if (partitioner == null) {
            partitioner = new RebalancePartitioner();
        }

        ...

        if (exchangeMode == null) {
            exchangeMode = StreamExchangeMode.UNDEFINED;
        }

        
        int uniqueId = getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();

        StreamEdge edge =
                new StreamEdge(
                        upstreamNode,
                        downstreamNode,
                        typeNumber,
                        partitioner,
                        outputTag,
                        exchangeMode,
                        uniqueId);
		//添加streamEdge到对应的streamNode中
        getStreamNode(edge.getSourceId()).addOutEdge(edge);
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
 

以上在LegacySourceTransformation生成的streamNode和OneInputTransformation生成的streamNode中用streamEdge连起来。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3pjZn4jl-1644398935164)(…/…/images/flink/stream-graph-jar/stream-14.png)]

ReduceTransformation的转换

先处理input,input为PartitionTransformation

PartitionTransformation的转换

找到Input添加一个虚拟分区节点,不会生成 StreamNode,将PartitionTransformation的相关信息保存到Map, StreamExchangeMode>> virtualPartitionNodes中,virtualPartitionNodes的key是 虚拟分区节点的id, Tuple3中的第一个参数是input的id

private Collection translateInternal(
        final PartitionTransformation transformation, final Context context) {
    checkNotNull(transformation);
    checkNotNull(context);

    final StreamGraph streamGraph = context.getStreamGraph();

    final List> parentTransformations = transformation.getInputs();
    checkState(
            parentTransformations.size() == 1,
            "Expected exactly one input transformation but found "
                    + parentTransformations.size());
    final Transformation input = parentTransformations.get(0);

    List resultIds = new ArrayList<>();

    for (Integer inputId : context.getStreamNodeIds(input)) {
        final int virtualId = Transformation.getNewNodeId();
        //添加一个虚拟分区节点,不会生成 StreamNode
        streamGraph.addVirtualPartitionNode(
                inputId,
                virtualId,
                transformation.getPartitioner(),
                transformation.getExchangeMode());
        resultIds.add(virtualId);
    }
    return resultIds;
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9aOciRS3-1644398935164)(…/…/images/flink/stream-graph-jar/sream-15.png)]

完成了PartitionTransformation的转换后,进入ReduceTransformation

//ReduceTransformationTranslator

@Override
public Collection translateForStreamingInternal(
        final ReduceTransformation transformation, final Context context) {
    StreamGroupedReduceOperator groupedReduce =
            new StreamGroupedReduceOperator<>(
                    transformation.getReducer(),
                    transformation
                            .getInputType()
                            .createSerializer(context.getStreamGraph().getExecutionConfig()));

    SimpleOperatorFactory operatorFactory = SimpleOperatorFactory.of(groupedReduce);
    operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
    return translateInternal(
            transformation,
            operatorFactory,
            transformation.getInputType(),
            transformation.getKeySelector(),
            transformation.getKeyTypeInfo(),
            context);
}

同样的,先生成StreamNode,然后添加streamEdge。生成StreamEdge和之前的略有不同,因为ReduceTransformation的上游是PartitionTransformation,而PartitionTransformation是虚拟分区节点。

} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
    int virtualId = upStreamVertexID;
    upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
    if (partitioner == null) {
        partitioner = virtualPartitionNodes.get(virtualId).f1;
    }
    exchangeMode = virtualPartitionNodes.get(virtualId).f2;
    addEdgeInternal(
            upStreamVertexID,
            downStreamVertexID,
            typeNumber,
            partitioner,
            outputNames,
            outputTag,
            exchangeMode);
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fYOeOYLZ-1644398935165)(…/…/images/flink/stream-graph-jar/stream-16.png)]

过程是这样的:

OneInputTransformation生成的StreamNodeId=2, PartitionTransformation生成的虚拟分区节点(VirtualPartitionNode)的virtualId=6,ReduceTransformation的StreamNodeId=4

进入ReduceTransformation转换的streamGraph.addEdgeInternal方法后,发现ReduceTransformation的上游的id在virtualPartitionNodes中存在,便从virtualPartitionNodes拿到虚拟分区节点的上游id、StreamPartitioner和StreamExchangeMode

递归调用streamGraph.addEdgeInternal,将outputPartitioner和exchangeMode等信息保存到streamEdge中,并添加到OneInputTransformation生成的StreamNode(Flat Map-2)的outEdges和ReduceTransformation生成的StreamNode(Keyed Aggregation-4)的inEdges中。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ilDKUsMi-1644398935166)(…/…/images/flink/stream-graph-jar/stream-15.png)]

至此完成了ReduceTransformation的转换,其streamNode如下图所示。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RxYOjz1w-1644398935167)(…/…/images/flink/stream-graph-jar/stream-17.png)]

LegacySinkTransformation的转换

流程和其他Transformation一样

//LegacySinkTransformationTranslator
private Collection translateInternal(
        final LegacySinkTransformation transformation, final Context context) {
    checkNotNull(transformation);
    checkNotNull(context);

    final StreamGraph streamGraph = context.getStreamGraph();
    final String slotSharingGroup = context.getSlotSharingGroup();
    final int transformationId = transformation.getId();
    final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();

    final List> parentTransformations = transformation.getInputs();
    checkState(
            parentTransformations.size() == 1,
            "Expected exactly one input transformation but found "
                    + parentTransformations.size());
    final Transformation input = parentTransformations.get(0);
	//添加sink节点
    streamGraph.addSink(
            transformationId,
            slotSharingGroup,
            transformation.getCoLocationGroupKey(),
            transformation.getOperatorFactory(),
            input.getOutputType(),
            null,
            "Sink: " + transformation.getName());

    final int parallelism =
            transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                    ? transformation.getParallelism()
                    : executionConfig.getParallelism();
    streamGraph.setParallelism(transformationId, parallelism);
    streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());

    for (Integer inputId : context.getStreamNodeIds(input)) {
        streamGraph.addEdge(inputId, transformationId, 0);
    }

    if (transformation.getStateKeySelector() != null) {
        TypeSerializer keySerializer =
                transformation.getStateKeyType().createSerializer(executionConfig);
        streamGraph.setOneInputStateKey(
                transformationId, transformation.getStateKeySelector(), keySerializer);
    }

    return Collections.emptyList();
}
//StreamGraph
public  void addSink(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        StreamOperatorFactory operatorFactory,
        TypeInformation inTypeInfo,
        TypeInformation outTypeInfo,
        String operatorName) {
    addOperator(
            vertexID,
            slotSharingGroup,
            coLocationGroup,
            operatorFactory,
            inTypeInfo,
            outTypeInfo,
            operatorName);
    if (operatorFactory instanceof OutputFormatOperatorFactory) {
        setOutputFormat(
                vertexID, ((OutputFormatOperatorFactory) operatorFactory).getOutputFormat());
    }
    sinks.add(vertexID);
}

具体逻辑和LegacySourceTransformation差不多。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jYGpMWwM-1644398935167)(…/…/images/flink/stream-graph-jar/stream-18.png)]

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号