栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink作业提交源码解析(1)-命令行解析及运行

flink作业提交源码解析(1)-命令行解析及运行

flink作业提交源码解析(1)-命令行解析及运行

原文链接

版本

flink: release-1.14
os: ubuntu 16.04
IDE: IDEA

从bin/flink 这个提交脚本最后一行

exec "${JAVA_RUN}" $JVM_ARGS $Flink_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

可以看出,入口类为org.apache.flink.client.cli.CliFrontend。

开启远程调试功能,在org.apache.flink.client.cli.CliFrontend的main函数中打断点。

总流程

flink-client 入口: CliFrontend.java

 
    public static void main(final String[] args) {
        //加载环境变量,包括 code revision, current user, Java version, Hadoop version, JVM parameters
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        int retCode = 31;
        try {
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
        } finally {
            System.exit(retCode);
        }
    }

主要分为4个步骤:

    加载环境变量,包括 code revision, current user, Java version, Hadoop version, JVM parameters等,然后从环境变量中获取到flink的配置目录

    解析配置,读取``flink-conf.yaml`,如下图所示

    初始化命令行解析入口,这里包含了一下三种命令行解析入口类

    //运行bin/flink run-application xx 或者bin/flink run -t xxx的 ,
    //-t包含了"remote", "local", "kubernetes-session","yarn-per-job", "yarn-session"等模式
    org.apache.flink.client.cli.GenericCLI
    //提交到yarn session集群的命令行解析入口
    org.apache.flink.yarn.cli.FlinkYarnSessionCli
    //默认的命令行解析入口类,提交到standalone集群
    org.apache.flink.client.cli.DefaultCLI
    

    初始化CliFrontend,执行cli.parseAndRun(args),完成flink client的主要逻辑,其中我们最需要关注的,便是cli.parseAndRun(args)做了些什么事情。

命令行解析和运行

进入parseAndRun函数中,可以看到程序通过解析第一个参数,来判断进入进入哪种具体的逻辑,然后把args[1:]的作为参数params传入具体函数里。这里我们使用第一种,即进入run(params)中。

public int parseAndRun(String[] args) {
        // check for action
        if (args.length < 1) {
            CliFrontendParser.printHelp(customCommandLines);
            System.out.println("Please specify an action.");
            return 1;
        }
        // get action
        String action = args[0];
        // remove action from parameters
        final String[] params = Arrays.copyOfRange(args, 1, args.length);
        try {
            // do action
            switch (action) {
                case ACTION_RUN:
                    run(params);
                    return 0;
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                case ACTION_INFO:
                    info(params);
                    return 0;
                case ACTION_CANCEL:
                    cancel(params);
                    return 0;
                case ACTION_STOP:
                    stop(params);
                    return 0;
                case ACTION_SAVEPOINT:
                    savepoint(params);
                    return 0;
   ...
protected void run(String[] args) throws Exception {
    LOG.info("Running 'run' command.");
	//1. 获取支持的参数
    final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    //2. 解析args,封装到命令行工具类里
    final CommandLine commandLine = getCommandLine(commandOptions, args, true);

    // evaluate help flag
    if (commandLine.hasOption(HELP_OPTION.getOpt())) {
        CliFrontendParser.printHelpForRun(customCommandLines);
        return;
    }

    final CustomCommandLine activeCommandLine =
            validateAndGetActiveCommandLine(checkNotNull(commandLine));
	//3. 使用解析到的commandLine去生成ProgramOptions
    final ProgramOptions programOptions = ProgramOptions.create(commandLine);
	//4. 提取作业的入口jar包
    final List jobJars = getJobJarAndDependencies(programOptions);
	//5. 提取有效的配置
    final Configuration effectiveConfiguration =
            getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

    LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
	//6. 构建PackagedProgram
    try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
        //7. 执行用户jar包逻辑以及提交作业
        executeProgram(effectiveConfiguration, program);
    }
}

run函数包含了以下步骤

    获取DefaultCLI这个命令行解析入口类支持的参数,包含以下参数,所有的参数都封装在org.apache.commons.cli.Options里。

    解析args,将解析结果封装到命令行工具类里,如下图

bin/flink run -d -p 1 -m 10.219.57.87:8081  
-C file:/home/jinyaqia/blink-test/flink-connector-kafka_2.12-1.14.2.jar 
-C file:/home/jinyaqia/blink-test/flink-text-0.1.0-SNAPSHOT.jar 
/home/jinyaqia/blink-test/flink-sql-submit-0.1.0-SNAPSHOT.jar
-name query_6311_dw_liangyu_211112104148
-f /home/jinyaqia/blink-test/114kafka.sql


3. 生成ProgramOptions

public class ProgramOptions extends CommandLineOptions {
	//用户入口jar包
    private String jarFilePath;
	//用户入口类,可以用-c指定,也可以打包入口jar包的时候,指定入口类
    protected String entryPointClass;
	//通过-C指定的classpath
    private final List classpaths;
	//用户入口类的参数
    private final String[] programArgs;
	//-p指定的并行度
    private final int parallelism;
	//用-d表示detached模式,即提交作业后,退出client
    private final boolean detachedMode;
    private final boolean shutdownOnAttachedExit;
	//savapoint设置
    private final SavepointRestoreSettings savepointSettings;

    提取作业的入口jar包

    提取有效的配置到Configuration

    构建PackagedProgram,该类用于描述用户提交的作业及其依赖的文件等。
//包含以下属性
//用户入口jar包
private final URL jarFile;
//入口jar包的参数
private final String[] args;
//入口类
private final Class mainClass;
//其他资源文件
private final List extractedTempLibraries;
//-C传入的classpath
private final List classpaths;
//用户类加载器
private final URLClassLoader userCodeClassLoader;
//savepoint设置
private final SavepointRestoreSettings savepointSettings;

    执行用户jar包代码逻辑以及提交作业

    protected void executeProgram(final Configuration configuration, final PackagedProgram program)
            throws ProgramInvocationException {
        ClientUtils.executeProgram(
                new DefaultExecutorServiceLoader(), configuration, program, false, false);
    }
    

    具体的执行逻辑如下:

    从PackagedProgram中获取用于加载用户代码的类加载器,这里是ChildFirstClassLoader,该类可以在flink-conf.yaml中配置classloader.resolve-order,默认为child-first设置当前执行线程的classloader为ChildFirstClassLoader使用当前类加载器和配置去初始化ContextEnvironment和StreamContextEnvironment,这两种ExecutionEnvironment在运行用户代码的时候会用到,用户代码中的 getExecutionEnvironment 会返回该 Environment运行用户代码

重设为原来的classloader,即AppClassLoader

//ClientUtils.java
public static void executeProgram(
        PipelineExecutorServiceLoader executorServiceLoader,
        Configuration configuration,
        PackagedProgram program,
        boolean enforceSingleJobExecution,
        boolean suppressSysout)
        throws ProgramInvocationException {
    checkNotNull(executorServiceLoader);
    final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    try {
        Thread.currentThread().setContextClassLoader(userCodeClassLoader);

        LOG.info(
                "Starting program (detached: {})",
                !configuration.getBoolean(DeploymentOptions.ATTACHED));
		//用户代码中的 getExecutionEnvironment 会返回该 Environment
        ContextEnvironment.setAsContext(
                executorServiceLoader,
                configuration,
                userCodeClassLoader,
                enforceSingleJobExecution,
                suppressSysout);

        StreamContextEnvironment.setAsContext(
                executorServiceLoader,
                configuration,
                userCodeClassLoader,
                enforceSingleJobExecution,
                suppressSysout);

        try {
            program.invokeInteractiveModeForExecution();
        } finally {
            ContextEnvironment.unsetAsContext();
            StreamContextEnvironment.unsetAsContext();
        }
    } finally {
        Thread.currentThread().setContextClassLoader(contextClassLoader);
    }
}
用户代码的运行

在运行用户代码的时候,初始化了ContextEnvironment和StreamContextEnvironment。调用callMainMethod方法。

public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
    FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
    try {
        callMainMethod(mainClass, args);
    } finally {
        FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
    }
}

callMainMethod使用了反射机制,去运行用户代码的入口类。

private static void callMainMethod(Class entryClass, String[] args)
        throws ProgramInvocationException {
    Method mainMethod;
    if (!Modifier.isPublic(entryClass.getModifiers())) {
        throw new ProgramInvocationException(
                "The class " + entryClass.getName() + " must be public.");
    }

    try {
        mainMethod = entryClass.getMethod("main", String[].class);
    } catch (NoSuchMethodException e) {
        throw new ProgramInvocationException(
                "The class " + entryClass.getName() + " has no main(String[]) method.");
    } catch (Throwable t) {
        throw new ProgramInvocationException(
                "Could not look up the main(String[]) method from the class "
                        + entryClass.getName()
                        + ": "
                        + t.getMessage(),
                t);
    }

    if (!Modifier.isStatic(mainMethod.getModifiers())) {
        throw new ProgramInvocationException(
                "The class " + entryClass.getName() + " declares a non-static main method.");
    }
    if (!Modifier.isPublic(mainMethod.getModifiers())) {
        throw new ProgramInvocationException(
                "The class " + entryClass.getName() + " declares a non-public main method.");
    }

    try {
        mainMethod.invoke(null, (Object) args);
    } catch (IllegalArgumentException e) {
        throw new ProgramInvocationException(
                "Could not invoke the main method, arguments are not matching.", e);
    } catch (IllegalAccessException e) {
        throw new ProgramInvocationException(
                "Access to the main method was denied: " + e.getMessage(), e);
    } catch (InvocationTargetException e) {
        Throwable exceptionInMethod = e.getTargetException();
        if (exceptionInMethod instanceof Error) {
            throw (Error) exceptionInMethod;
        } else if (exceptionInMethod instanceof ProgramParametrizationException) {
            throw (ProgramParametrizationException) exceptionInMethod;
        } else if (exceptionInMethod instanceof ProgramInvocationException) {
            throw (ProgramInvocationException) exceptionInMethod;
        } else {
            throw new ProgramInvocationException(
                    "The main method caused an error: " + exceptionInMethod.getMessage(),
                    exceptionInMethod);
        }
    } catch (Throwable t) {
        throw new ProgramInvocationException(
                "An error occurred while invoking the program's main method: " + t.getMessage(),
                t);
    }
}
参考

Flink Client 实现原理与源码解析(保姆级教学)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730407.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号