栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink任务调度逻辑(基于1.12)

Flink任务调度逻辑(基于1.12)

Flink任务调度部署原理(基于1.12) 术语
  1. Region: A pipelined region is defined as the set of tasks that are connected via pipelined data exchanges. 将通过pipeline方式交互数据的subtask的一个task集合。
  2. ExecutionVertex: 作业逻辑拓扑中的一个顶点的一个并行度,即
  3. JobVertex: 作业逻辑拓扑中的一个顶点
  4. CoLocationGroup: 一组JobVertex,强制约束每个JobVertex相同下标的subtask都必须运行在同个TaskManager上。
  5. SlotSharingGroup: 一组JobVertex,软约束每个JobVertex的subtask可以运行在同个slot上,不要求下标一一对应。
slot共享策略(SlotSharingStrategy)

决定SlotSharingGroup的划分方式,目前只有一种策略:

  • LocalInputPreferredSlotSharingStrategy: 尽量减少跨网络数据传输划分SlotSharingGroup。运行在同个slot代表task部署于同个taskexecutor进程中,task之间的数据传输不需要跨网络

This strategy tries to reduce remote data exchanges. Execution vertices, which are connected and belong to the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. Co-location constraints will be respected.

Slot选择策略(SlotSelectionStrategy)
  • DefaultLocationPreferenceSlotSelectionStrategy: 默认策略,优选择预设的taskmanager分配slot,否则按顺序从slotpool选择第一个满足资源需求的slot
  • EvenlySpreadOutLocationPreferenceSlotSelectionStrategy: 优选择预设的taskmanager分配slot,否则选择所在taskmanager使用量最少的slot
  • PreviousAllocationSlotSelectionStrategy: 优先选择上一次分配的slot,否则使用预设的备选策略(上两个策略)
调度策略(SchedulingStrategy)

决定task的调度策略。
配置参数 jobmanager.scheduler.scheduling-strategy: region/legacy

  1. region(PIPELINED_REGION_SCHEDULING)[1]:

    管道区域调度,默认调度策略。实现类:PipelinedRegionSchedulingStrategy

  2. legacy(LEGACY_SCHEDULING):

    传统调度。实现类有两个:EagerSchedulingStrategy主要用于流场景,立即调度所有task; LazyFromSourcesSchedulingStrategy主要用于批作业,下游task只在输入数据准备完毕才开始调度

调度逻辑

基于region调度策略的调度逻辑。

调度链路:
PipelinedRegionSchedulingStrategy.startScheduling -> PipelinedRegionSchedulingStrategy.maybeScheduleRegions -> DefaultScheduler.allocateSlotsAndDeploy -> DefaultScheduler.allocateSlots -> ExecutionSlotAllocator.allocateSlotsFor

  • 调度源头region

捞出调度拓扑图中获取已经划分好的region集合,筛选并调度出最上游的region,即包含source vertex的region

    @Override
    public void startScheduling() {
        final Set sourceRegions =
                IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
                        .filter(region -> !region.getConsumedResults().iterator().hasNext())
                        .collect(Collectors.toSet());
        maybeScheduleRegions(sourceRegions);
    }

  • 获取region中所有的subtask,并为这组subtask分配slot。ExecutionVertexDeploymentOption代表一个subtask的部署信息
    private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
        if (!areRegionInputsAllConsumable(region)) {
            return;
        }

        checkState(
                areRegionVerticesAllInCreatedState(region),
                "BUG: trying to schedule a region which is not in CREATED state");

        final List vertexDeploymentOptions =
                SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
                        regionVerticesSorted.get(region), id -> deploymentOption);
        schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
    }

  • 给一组subtask分配slot并部署的核心逻辑

Allocate slots and deploy the vertex when slots are returned. Vertices will be deployed only after all of them have been assigned slots. The given order will be respected, i.e. tasks with smaller indices will be deployed earlier. only vertices in CREATED state will be accepted. Errors will happen if scheduling Non-CREATED vertices.

下标最小的subtask将会优先分配slot,当所有ExecutionVertex(即一个subtask)都分配到slot后才会执行subtask的部署操作。

    @Override
    public void allocateSlotsAndDeploy(
            final List executionVertexDeploymentOptions) {
        1、验证部署配置 
        validateDeploymentOptions(executionVertexDeploymentOptions);
        
        2、发布配置list->map
        final Map deploymentOptionsByVertex =
                groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);

        3、获取所有需要部署的subtask id
        final List verticesToDeploy =
                executionVertexDeploymentOptions.stream()
                        .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
                        .collect(Collectors.toList());

        4、累计subtask变动次数
        final Map requiredVersionByVertex =
                executionVertexVersioner.recordVertexModifications(verticesToDeploy);

        5、变更subtask状态为SCHEDULED
        transitionToScheduled(verticesToDeploy);

        6、向resourcemanager申请slot(异步操作)
        final List slotExecutionVertexAssignments =
                allocateSlots(executionVertexDeploymentOptions);

        7、slot申请操作完成后执行相应操作,成功则开始subtask部署逻辑
        final List deploymentHandles =
                createDeploymentHandles(
                        requiredVersionByVertex,
                        deploymentOptionsByVertex,
                        slotExecutionVertexAssignments);

        waitForAllSlotsAndDeploy(deploymentHandles);
    }
            this.slotSharingGroupMap = new HashMap<>();
            for (SlotSharingGroup slotSharingGroup : logicalSlotSharingGroups) {
                for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) {
                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup.getSlotSharingGroupId());
                }
            }

            this.coLocationGroupMap = new HashMap<>();
            for (CoLocationGroupDesc coLocationGroup : coLocationGroups) {
                for (JobVertexID jobVertexId : coLocationGroup.getVertices()) {
                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
                }
            }

            executionSlotSharingGroupMap = new HashMap<>();
            constraintToExecutionSlotSharingGroupMap = new HashMap<>();
            executionSlotSharingGroups = new HashMap<>();
            assignedJobVerticesForGroups = new IdentityHashMap<>();

[1]FLIP-119 Pipelined Region Scheduling

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307394.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号