栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink源码分析-StreamGraph的生成

flink源码分析-StreamGraph的生成

前提之一:DAG

理解StreamGraph的前提之一就是要对DAG有向无环图有所了解。需要知道什么是DAG的顶点和边,以及怎么通过顶点和边构建DAG。

前提之二:Transformation

Flink流处理中的各个算子操作会转化成一系列的Transformation对象进行存储,StreamGraph就是通过一系列的Transformation对象进行构建。

Transformation类型
  • OneInputTransformation
    只有一个输入的一类转化操作,如map、filter、process等算子
  • TwoInputTransformation
    TwoInputTransformation具有两个输入。ConnectedStream的算子为双流运算,它的算子会被转换为TwoInputTransformation。
  • SourceTransformation
    在env.addSource()的时候会创建一个DataStreamSource。DataStreamSource的构造函数中会创建一个SourceTransformation。
  • SinkTransformation
    和SourceTransformation类似,在dataStream调用addSink方法的时候会生成一个DataStreamSink对象。该对象在创建的时候会同时构造一个SinkTransformation。
  • UnionTransformation
    DataStream在进行union的时候会创建一个UnionTransformation
 public final DataStream union(DataStream... streams) {
        List> unionedTransforms = new ArrayList<>();
        unionedTransforms.add(this.transformation);

        for (DataStream newStream : streams) {
            if (!getType().equals(newStream.getType())) {
                throw new IllegalArgumentException(
                        "Cannot union streams of different types: "
                                + getType()
                                + " and "
                                + newStream.getType());
            }

            unionedTransforms.add(newStream.getTransformation());
        }
        return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
    }
  • FeedbackTransformation
    DataStream在使用iterate()算子时创建IterativeStream的同时会创建FeedbackTransformation
protected IterativeStream(DataStream dataStream, long maxWaitTime) {
        super(
                dataStream.getExecutionEnvironment(),
                new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
        this.originalInput = dataStream;
        this.maxWaitTime = maxWaitTime;
        setBufferTimeout(dataStream.environment.getBufferTimeout());
    }
  • CoFeedbackTransformation
    构建ConnectedIterativeStreams时会创建CoFeedbackTransformation
 public static class ConnectedIterativeStreams extends ConnectedStreams {

        private CoFeedbackTransformation coFeedbackTransformation;

        public ConnectedIterativeStreams(
                DataStream input, TypeInformation feedbackType, long waitTime) {
            super(
                    input.getExecutionEnvironment(),
                    input,
                    new DataStream<>(
                            input.getExecutionEnvironment(),
                            new CoFeedbackTransformation<>(
                                    input.getParallelism(), feedbackType, waitTime)));
            this.coFeedbackTransformation =
                    (CoFeedbackTransformation) getSecondInput().getTransformation();
        }
  • PartitionTransformation
    进行shuffle、forward、rebalance、keyBy等控制数据流向的算子都属于PartitionTransformation
 @PublicEvolving
    public DataStream shuffle() {
        return setConnectionType(new ShufflePartitioner());
    }
 protected DataStream setConnectionType(StreamPartitioner partitioner) {
        return new DataStream<>(
                this.getExecutionEnvironment(),
                new PartitionTransformation<>(this.getTransformation(), partitioner));
    }

  • SideOutputTransformation
    SideOutputTransformation在进行旁路输出时创建
StreamGraph的构建

我们从StreamExecutionEnvironment的execute方法开始分析

public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        final StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        return execute(streamGraph);
    }

execute中调用了getStreamGraph()方法,getStreamGraph生成了一个StreamGraph实例对象,然后给任务设置Name继续提交执行任务,可见生成StreamGraph的逻辑都在getStreamGraph()方法中,我们接着往下看
getStreamGraph方法

public StreamGraph getStreamGraph() {
        return getStreamGraph(true);
    }
 public StreamGraph getStreamGraph(boolean clearTransformations) {
        final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
        if (clearTransformations) {
            transformations.clear();
        }
        return streamGraph;
    }

getStreamGraphGenerator(transformations).generate();这行代码意思是通过transformations列表构建StreamGraphGenerator,然后通过generate生成StreamGraph,那么问题来了,transformations集合是什么时候被填充的呢?追踪代码我们发现在StreamExecutionEnvironment的addOperator方法中transformations被填充

public void addOperator(Transformation transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }

addOperator在进行各种算子操作时都会调用,如addSink

 public DataStreamSink addSink(SinkFunction sinkFunction) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }

        StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction));

        DataStreamSink sink = new DataStreamSink<>(this, sinkOperator);

        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

然后我们回过来看生成StreamGraph的generate方法

public StreamGraph generate() {
        // 进行各种配置
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        streamGraph.setEnableCheckpointsAfterTasksFinish(
                configuration.get(
                        ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
        shouldExecuteInBatchMode = shouldExecuteInBatchMode();
        configureStreamGraph(streamGraph);

        // 缓存转换过的transformation
        alreadyTransformed = new HashMap<>();

        for (Transformation transformation : transformations) {
            transform(transformation);
        }

        streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);

        setFineGrainedGlobalStreamExchangeMode(streamGraph);

        for (StreamNode node : streamGraph.getStreamNodes()) {
            if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
                for (StreamEdge edge : node.getInEdges()) {
                    edge.setSupportsUnalignedCheckpoints(false);
                }
            }
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

继续追踪一下transform方法

 private Collection transform(Transformation transform) {
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from the ExecutionConfig.
            int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // Transformation资源组配置
        transform
                .getSlotSharingGroup()
                .ifPresent(
                        slotSharingGroup -> {
                            final ResourceSpec resourceSpec =
                                    SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
                            if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
                                slotSharingGroupResources.compute(
                                        slotSharingGroup.getName(),
                                        (name, profile) -> {
                                            if (profile == null) {
                                                return ResourceProfile.fromResourceSpec(
                                                        resourceSpec, MemorySize.ZERO);
                                            } else if (!ResourceProfile.fromResourceSpec(
                                                            resourceSpec, MemorySize.ZERO)
                                                    .equals(profile)) {
                                                throw new IllegalArgumentException(
                                                        "The slot sharing group "
                                                                + slotSharingGroup.getName()
                                                                + " has been configured with two different resource spec.");
                                            } else {
                                                return profile;
                                            }
                                        });
                            }
                        });

        // 检查输出类型
        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        @SuppressWarnings("unchecked")
        // 获取对应Transformation的TransformationTranslator,通过不同translator进行转换
        final TransformationTranslator> translator =
                (TransformationTranslator>)
                        translatorMap.get(transform.getClass());

        Collection transformedIds;
        if (translator != null) {
            transformedIds = translate(translator, transform);
        } else {
            transformedIds = legacyTransform(transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        return transformedIds;
    }

然后时translate方法

private Collection translate(
            final TransformationTranslator> translator,
            final Transformation transform) {
        checkNotNull(translator);
        checkNotNull(transform);

        final List> allInputIds = getParentInputIds(transform.getInputs());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        // 确定Transformation的资源组,具体逻辑如下:
        // 1.如果设置了资源组,则使用设置的资源组,
        // 2.如果没有设置,采用上游输入的资源组配置
        // 3.如果多个输入节点的资源组配置不同,则采用默认的资源组配置
        final String slotSharingGroup =
                determineSlotSharingGroup(
                        transform.getSlotSharingGroup().isPresent()
                                ? transform.getSlotSharingGroup().get().getName()
                                : null,
                        allInputIds.stream()
                                .flatMap(Collection::stream)
                                .collect(Collectors.toList()));

        final TransformationTranslator.Context context =
                new ContextImpl(this, streamGraph, slotSharingGroup, configuration);

        return shouldExecuteInBatchMode
                ? translator.translateForBatch(transform, context)
                : translator.translateForStreaming(transform, context);
    }

translate中有个比较重要的资源组配置逻辑:

  • 如果设置了资源组,则使用设置的资源组,
  • 如果没有设置,采用上游输入的资源组配置
  • 如果多个输入节点的资源组配置不同,则采用默认的资源组配置

然后就是构造TransformationTranslator上下文,选择流处理或批处理模式去执行,
比如选择流处理模式:

 @Override
    public final Collection translateForStreaming(
            final T transformation, final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);

        final Collection transformedIds =
                translateForStreamingInternal(transformation, context);
        configure(transformation, context);

        return transformedIds;
    }

translateForStreamingInternal方法是个抽象方法,不同的Transformation类型交由不同的实现去处理。
我们从SourceTransformationTranslator看一下具体的实现:

@Override
    protected Collection translateForStreamingInternal(
            final SourceTransformation transformation,
            final Context context) {

        return translateInternal(transformation, context, true );
    }
private Collection translateInternal(
            final SourceTransformation transformation,
            final Context context,
            boolean emitProgressiveWatermarks) {
        checkNotNull(transformation);
        checkNotNull(context);
        
        final StreamGraph streamGraph = context.getStreamGraph();
        final String slotSharingGroup = context.getSlotSharingGroup();
        final int transformationId = transformation.getId();
        final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();

        // 新建SourceOperator工厂类
        SourceOperatorFactory operatorFactory =
                new SourceOperatorFactory<>(
                        transformation.getSource(),
                        transformation.getWatermarkStrategy(),
                        emitProgressiveWatermarks);

        
        // 设置算子链策略
        operatorFactory.setChainingStrategy(transformation.getChainingStrategy());

        // add source到streamGraph
        streamGraph.addSource(
                transformationId,
                slotSharingGroup,
                transformation.getCoLocationGroupKey(),
                operatorFactory,
                null,
                transformation.getOutputType(),
                "Source: " + transformation.getName());

        // 设置并行度
        final int parallelism =
                transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                        ? transformation.getParallelism()
                        : executionConfig.getParallelism();

        streamGraph.setParallelism(transformationId, parallelism);
        streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
        return Collections.singleton(transformationId);
    }

接着追踪addSource方法

public  void addSource(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            SourceOperatorFactory operatorFactory,
            TypeInformation inTypeInfo,
            TypeInformation outTypeInfo,
            String operatorName) {
        addOperator(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                operatorFactory,
                inTypeInfo,
                outTypeInfo,
                operatorName,
                SourceOperatorStreamTask.class);
        sources.add(vertexID);
    }

addSource包含了一个addOperator方法,然后把节点ID添加到StreamGraph的sources属性中

private  void addOperator(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperatorFactory operatorFactory,
            TypeInformation inTypeInfo,
            TypeInformation outTypeInfo,
            String operatorName,
            Class invokableClass) {

        addNode(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                invokableClass,
                operatorFactory,
                operatorName);
        setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));

        if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
            // sets the output type which must be know at StreamGraph creation time
            operatorFactory.setOutputType(outTypeInfo, executionConfig);
        }

        if (operatorFactory.isInputTypeConfigurable()) {
            operatorFactory.setInputType(inTypeInfo, executionConfig);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", vertexID);
        }
    }

addOperator方法中调用了addNode方法

 protected StreamNode addNode(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            Class vertexClass,
            StreamOperatorFactory operatorFactory,
            String operatorName) {

        if (streamNodes.containsKey(vertexID)) {
            throw new RuntimeException("Duplicate vertexID " + vertexID);
        }

        StreamNode vertex =
                new StreamNode(
                        vertexID,
                        slotSharingGroup,
                        coLocationGroup,
                        operatorFactory,
                        operatorName,
                        vertexClass);

        streamNodes.put(vertexID, vertex);

        return vertex;
    }

可以看到,最终把节点信息封装到了StreamNode当中,然后放入streamNodes集合当中。这个source节点,没有涉及到edge的添加逻辑。我们粗略看一下OneInputTransformation添加到StreamGraph的逻辑:

protected Collection translateInternal(
            final Transformation transformation,
            final StreamOperatorFactory operatorFactory,
            final TypeInformation inputType,
            @Nullable final KeySelector stateKeySelector,
            @Nullable final TypeInformation stateKeyType,
            final Context context) {
        checkNotNull(transformation);
        checkNotNull(operatorFactory);
        checkNotNull(inputType);
        checkNotNull(context);

        final StreamGraph streamGraph = context.getStreamGraph();
        final String slotSharingGroup = context.getSlotSharingGroup();
        final int transformationId = transformation.getId();
        final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();

        streamGraph.addOperator(
                transformationId,
                slotSharingGroup,
                transformation.getCoLocationGroupKey(),
                operatorFactory,
                inputType,
                transformation.getOutputType(),
                transformation.getName());

        if (stateKeySelector != null) {
            TypeSerializer keySerializer = stateKeyType.createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
        }

        int parallelism =
                transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                        ? transformation.getParallelism()
                        : executionConfig.getParallelism();
        streamGraph.setParallelism(transformationId, parallelism);
        streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());

        final List> parentTransformations = transformation.getInputs();
        checkState(
                parentTransformations.size() == 1,
                "Expected exactly one input transformation but found "
                        + parentTransformations.size());

        for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
            streamGraph.addEdge(inputId, transformationId, 0);
        }

        return Collections.singleton(transformationId);
    }
}

可以看到,AbstractOneInputTransformationTranslator.translateInternal方法中调用了streamGraph.addEdge方法

 public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
        addEdgeInternal(
                upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                null,
                new ArrayList(),
                null,
                null);
    }
private void addEdgeInternal(
            Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner partitioner,
            List outputNames,
            OutputTag outputTag,
            StreamExchangeMode exchangeMode) {

        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    null,
                    outputTag,
                    exchangeMode);
        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            exchangeMode = virtualPartitionNodes.get(virtualId).f2;
            addEdgeInternal(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    outputNames,
                    outputTag,
                    exchangeMode);
        } else {
            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);

            // 如果没有指定分区器,并且上下游操作符的并行度一致则使用forward方式分发数据,否则使用rebalance方式分发数据
            // If no partitioner was specified and the parallelism of upstream and downstream
            // operator matches use forward partitioning, use rebalance otherwise.
            if (partitioner == null
                    && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                partitioner = new ForwardPartitioner();
            } else if (partitioner == null) {
                partitioner = new RebalancePartitioner();
            }

            if (partitioner instanceof ForwardPartitioner) {
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException(
                            "Forward partitioning does not allow "
                                    + "change of parallelism. Upstream operation: "
                                    + upstreamNode
                                    + " parallelism: "
                                    + upstreamNode.getParallelism()
                                    + ", downstream operation: "
                                    + downstreamNode
                                    + " parallelism: "
                                    + downstreamNode.getParallelism()
                                    + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

            if (exchangeMode == null) {
                exchangeMode = StreamExchangeMode.UNDEFINED;
            }

            StreamEdge edge =
                    new StreamEdge(
                            upstreamNode,
                            downstreamNode,
                            typeNumber,
                            partitioner,
                            outputTag,
                            exchangeMode);

            getStreamNode(edge.getSourceId()).addOutEdge(edge);
            getStreamNode(edge.getTargetId()).addInEdge(edge);
        }
    }
 

添加edge时涉及到数据的上下游分发方式,oneInputTransformation addEdge这里显示了一个重要的逻辑:如果没有指定分区器,并且上下游操作符的并行度一致则使用forward方式分发数据,否则使用rebalance方式分发数据

同时这里涉及到了虚拟节点,什么是虚拟节点呢,我们发现sideOutput,select和分区操作不需要用户传入自定义的处理逻辑,这些类型的变换会被处理成虚拟节点。虚拟节点严格来说不是StreamNode类型,不包含物理转换逻辑。虚拟节点的不会出现在StreamGraph的处理流中,在添加edge的时候如果上有节点为虚拟节点,会通过递归的方式寻找上游节点,直至找到一个非虚拟节点,再执行添加edge逻辑。虚拟节点通过内部的originalId属性,附着于非虚拟节点上。

文章参考:https://www.jianshu.com/p/138370b6b820

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号