栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink源码篇 No.3-任务提交之执行用户作业(per-job on yarn)

Flink源码篇 No.3-任务提交之执行用户作业(per-job on yarn)

第1章 执行用户程序

当用户代码最后调用env.execute()时,则开始执行如下程序:

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute(java.lang.String)

public JobExecutionResult execute(String jobName) throws Exception {
	Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
	//TODO 获取StreamGraph
	return execute(getStreamGraph(jobName));
}

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute(org.apache.flink.streaming.api.graph.StreamGraph)

@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
	//TODO 异步执行
	final JobClient jobClient = executeAsync(streamGraph);

	//...
}

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph) 

@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
	//TODO 检查校验streamGraph
	checkNotNull(streamGraph, "StreamGraph cannot be null.");
	checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

	final PipelineExecutorFactory executorFactory =
		executorServiceLoader.getExecutorFactory(configuration);

	checkNotNull(
		executorFactory,
		"Cannot find compatible factory for specified execution.target (=%s)",
		configuration.get(DeploymentOptions.TARGET));

	//TODO 执行
	CompletableFuture jobClientFuture = executorFactory
		.getExecutor(configuration)
		.execute(streamGraph, configuration, userClassloader);

	try {
		JobClient jobClient = jobClientFuture.get();
		jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
		return jobClient;
	} catch (ExecutionException executionException) {
		final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
		jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));

		throw new FlinkException(
			String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
			strippedException);
	}
}

到这里,执行的execute是一个接口函数,我们看YarnJobClusterExecutor的实现(在父类AbstractJobClusterExecutor中)。

org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor#execute

@Override
public CompletableFuture execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception {
	//TODO Client端StreamGraph(实现了Pipeline接口的类)转换成JobGraph
	final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);

	//TODO 集群描述器 Yarn和Flink的集群环境信息
	try (final ClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
		final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);

		//TODO 获取集群配置 JM内存 TM内存 TM中Slot个数等信息
		final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);

		//TODO 部署并启动集群中的各个进程
		final ClusterClientProvider clusterClientProvider = clusterDescriptor
				.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
		LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

		return CompletableFuture.completedFuture(
				new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
	}
}

这就是在用户程序最后调用env.execute(),所做的事情。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/304577.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号