当用户代码最后调用env.execute()时,则开始执行如下程序:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute(java.lang.String)
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
//TODO 获取StreamGraph
return execute(getStreamGraph(jobName));
}
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute(org.apache.flink.streaming.api.graph.StreamGraph)
@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
//TODO 异步执行
final JobClient jobClient = executeAsync(streamGraph);
//...
}
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph)
@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
//TODO 检查校验streamGraph
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
//TODO 执行
CompletableFuture jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
到这里,执行的execute是一个接口函数,我们看YarnJobClusterExecutor的实现(在父类AbstractJobClusterExecutor中)。
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor#execute
@Override public CompletableFutureexecute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception { //TODO Client端StreamGraph(实现了Pipeline接口的类)转换成JobGraph final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); //TODO 集群描述器 Yarn和Flink的集群环境信息 try (final ClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); //TODO 获取集群配置 JM内存 TM内存 TM中Slot个数等信息 final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); //TODO 部署并启动集群中的各个进程 final ClusterClientProvider clusterClientProvider = clusterDescriptor .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); return CompletableFuture.completedFuture( new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader)); } }
这就是在用户程序最后调用env.execute(),所做的事情。



