经过前面几篇文章的介绍,TM已经申请到Slot,并且向JM提供了执行任务的Slot。本篇文章将继续走读源码,介绍JM向TM提交任务的流程。
第2章 具体步骤 2.1 启动JM我们回到之前JM启动的代码:
org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
// 验证是否在主线程
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
// TODO 真正启动jobMaster(jobManager)服务
startJobMasterServices();
log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
// TODO 重置并启动调度器
resetAndStartScheduler();
return Acknowledge.get();
}
这里通过resetAndStartScheduler已经启动了调度器,我们继续往下看。
2.2 启动调度器org.apache.flink.runtime.jobmaster.JobMaster#resetAndStartScheduler
private void resetAndStartScheduler() throws Exception {
// ...
// TODO 启动调度
schedulerAssignedFuture.thenRun(this::startScheduling);
}
org.apache.flink.runtime.jobmaster.JobMaster#startScheduling
private void startScheduling() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
schedulerNG.registerJobStatusListener(jobStatusListener);
// TODO 启动调度
schedulerNG.startScheduling();
}
org.apache.flink.runtime.scheduler.SchedulerNG#startScheduling的实现方法:
org.apache.flink.runtime.scheduler.Schedulerbase#startScheduling
@Override
public final void startScheduling() {
mainThreadExecutor.assertRunningInMainThread();
registerJobMetrics();
startAllOperatorCoordinators();
// TODO 启动内部调度
startSchedulingInternal();
}
org.apache.flink.runtime.scheduler.DefaultScheduler#startSchedulingInternal
@Override
protected void startSchedulingInternal() {
log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());
prepareExecutionGraphForNgScheduling();
// TODO 启动任务调度
schedulingStrategy.startScheduling();
}
2.3 调度任务
org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy在flink中有几种调度策略:
这里我们看PipelinedRegionSchedulingStrategy
org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy#startScheduling的实现
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#startScheduling
@Override
public void startScheduling() {
final Set sourceRegions = IterableUtils
.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(region -> !region.getConsumedResults().iterator().hasNext())
.collect(Collectors.toSet());
// TODO region方式调度任务
maybeScheduleRegions(sourceRegions);
}
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#maybeScheduleRegions
private void maybeScheduleRegions(final Set2.4 部署任务regions) { final List regionsSorted = SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions); for (SchedulingPipelinedRegion region : regionsSorted) { // TODO region方式调度任务 maybeScheduleRegion(region); } }
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy#maybeScheduleRegion
private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
// ...
// TODO 分配slot,部署任务
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
org.apache.flink.runtime.scheduler.SchedulerOperations#allocateSlotsAndDeploy的实现方法:
org.apache.flink.runtime.scheduler.DefaultScheduler#allocateSlotsAndDeploy
@Override public void allocateSlotsAndDeploy(final ListexecutionVertexDeploymentOptions) { // ... // TODO 等待slot,部署任务 waitForAllSlotsAndDeploy(deploymentHandles); }
这里的等待slot,指的是上一篇文章中TM向JM提供的slot信息。
org.apache.flink.runtime.scheduler.DefaultScheduler#waitForAllSlotsAndDeploy
private void waitForAllSlotsAndDeploy(final ListdeploymentHandles) { // TODO 等待slot,部署任务 FutureUtils.assertNoException( assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles))); }
org.apache.flink.runtime.scheduler.DefaultScheduler#deployAll
private BiFunctiondeployAll(final List deploymentHandles) { return (ignored, throwable) -> { propagateIfNonNull(throwable); for (final DeploymentHandle deploymentHandle : deploymentHandles) { final SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment(); final CompletableFuture slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture(); checkState(slotAssigned.isDone()); // TODO 部署和异常处理 FutureUtils.assertNoException( slotAssigned.handle(deployOrHandleError(deploymentHandle))); } return null; }; }
org.apache.flink.runtime.scheduler.DefaultScheduler#deployOrHandleError
private BiFunction
org.apache.flink.runtime.scheduler.DefaultScheduler#deployTaskSafe
private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
try {
final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
// TODO 部署任务
executionVertexOperations.deploy(executionVertex);
} catch (Throwable e) {
handleTaskDeploymentFailure(executionVertexId, e);
}
}
org.apache.flink.runtime.scheduler.ExecutionVertexOperations#deploy的实现方法:
org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations#deploy
@Override
public void deploy(final ExecutionVertex executionVertex) throws JobException {
// TODO 部署job
executionVertex.deploy();
}
org.apache.flink.runtime.executiongraph.ExecutionVertex#deploy
public void deploy() throws JobException {
// TODO 部署当前job
currentExecution.deploy();
}
org.apache.flink.runtime.executiongraph.Execution#deploy
public void deploy() throws JobException {
// ...
try {
// TODO 向TM提交job
CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
if (failure == null) {
vertex.notifyCompletedDeployment(this);
} else {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
markFailed(new Exception(
"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
jobMasterMainThreadExecutor);
}
catch (Throwable t) {
markFailed(t);
if (isLegacyScheduling()) {
ExceptionUtils.rethrow(t);
}
}
}
2.5 TM中执行任务
通过RPC的方式调用TM中的方法,在TM中执行任务。
org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway#submitTask的实现方法:
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway#submitTask
@Override
public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
}
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway#submitTask的实现方法:
org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask
@Override
public CompletableFuture submitTask(
TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
Time timeout) {
try {
// ...
// TODO 建立Task
Task task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED, task::isBackPressured);
log.info("Received task {} ({}), deploy into slot with allocation id {}.",
task.getTaskInfo().getTaskNameWithSubtasks(), tdd.getExecutionAttemptId(), tdd.getAllocationId());
boolean taskAdded;
try {
// TODO 将任务分配到指的的slot中,根据allocation id分配
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
// TODO 分配成功,则启动任务线程,执行任务
if (taskAdded) {
task.startTaskThread();
setupResultPartitionBookkeeping(
tdd.getJobId(),
tdd.getProducedPartitions(),
task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager already contains a task for id " +
task.getExecutionId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}
到这里,TM就正式开始执行我们提交的任务了!
Flink per-job on yarn模式下,整个任务提交的流程就为您介绍到这里。后续还会继续填充其中的一些细节,感谢您的关注!!



