本系列文章以一次任务提交为主线,从Flink源码的角度,为大家介绍:Flink任务是如何提交的、程序如何调用用户编写的作业代码、Flink如何启动其内部各个组件、又是如何申请资源,这一系列任务提交相关内容。以下内容基于yarn以per-job模式为例。源码版本Apache Flink 1.12。相关代码解释请注意代码块中的注释。
本篇文章先为大家简单介绍提交作业后,Flink任务执行的入口,和参数的解析。
第2章 flink提交作业 2.1 执行flink run命令我们已flink命令提交job为入口,开始了解源码。通常我们使用flink提交作业时,在命令行输入如下命令:
bin/flink run -t yarn-per-job /.../***.jar ...
查看flink脚本文件,此脚本文件最后exec就是作业提交的入口。
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec $JAVA_RUN $JVM_ARGS $Flink_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
最终执行的是org.apache.flink.client.cli.CliFrontend
2.2 CliFrontend main方法org.apache.flink.client.cli.CliFrontend#main
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory TODO 获取conf目录路径
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration TODO 根据conf目录获取全局配置
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines TODO 加载用户命令行参数 /flink run -t ... -C...
final List customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
//TODO run运行
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
System.exit(retCode);
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
}
后续由parseParameters方法进行解析也提交。
第3章 参数解析与执行 3.1 parseParameters方法解析参数并运行org.apache.flink.client.cli.CliFrontend#parseParameters
public int parseParameters(String[] args) {
// check for action
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action TODO 第一个参数
String action = args[0];
// remove action from parameters TODO 第一个参数后面的所有参数
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
//TODO run
run(params);
return 0;
//...
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
到这里启动的部分就简单介绍完了,接下来的文章继续往后看,下篇文章介绍Flink是如何调用用户的作业代码。
今天是2021年国庆假期最后一天,明天又要开工了,大家准备好了吗?在这里祝所有读者开工顺利!



