栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink源码剖析之JobGraph的生成

Flink源码剖析之JobGraph的生成

背景

其实,以前编写flink代码的时候,模式比较固定,不管你中间过程如何花里胡哨,开头总要有一个获取上下文环境,最后总要有一个:env.execute(),之前只是知道这个方法,会在job提交的时候,用于获取job的计算流程图,但是我一直很好奇,到底是什么时机生成的呢?

源码剖析 创建JobGraph

我们知道,提交任务的时候,是要有一步获取Job的JobGraph的:

   
    public static JobGraph createJobGraph(
            PackagedProgram packagedProgram,
            Configuration configuration,
            int defaultParallelism,
            @Nullable JobID jobID,
            boolean suppressOutput)
            throws ProgramInvocationException {
        final Pipeline pipeline =
                getPipelineFromProgram(
                        packagedProgram, configuration, defaultParallelism, suppressOutput);
        final JobGraph jobGraph =
                FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
                        packagedProgram.getUserCodeClassLoader(),
                        pipeline,
                        configuration,
                        defaultParallelism);
        if (jobID != null) {
            jobGraph.setJobID(jobID);
        }
        jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
        jobGraph.setClasspaths(packagedProgram.getClasspaths());
        jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());

        return jobGraph;
    }

我们可以看到,首先要生成Pipeline, 这样一个单纯的用于表示数据操作链路的结构, 然后再配合上配置+并行度等这样比较具体一些的东西,就叫JobGraph,所以我们主要看一下Pipeline是怎么生成的:

创建Pipeline
public static Pipeline getPipelineFromProgram(
            PackagedProgram program,
            Configuration configuration,
            int parallelism,
            boolean suppressOutput)
            throws CompilerException, ProgramInvocationException {
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();

        Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());

        final PrintStream originalOut = System.out;
        final PrintStream originalErr = System.err;
        final ByteArrayOutputStream stdOutBuffer;
        final ByteArrayOutputStream stdErrBuffer;

        if (suppressOutput) {
            // temporarily write STDERR and STDOUT to a byte array.
            stdOutBuffer = new ByteArrayOutputStream();
            System.setOut(new PrintStream(stdOutBuffer));
            stdErrBuffer = new ByteArrayOutputStream();
            System.setErr(new PrintStream(stdErrBuffer));
        } else {
            stdOutBuffer = null;
            stdErrBuffer = null;
        }

        // temporary hack to support the optimizer plan preview
        OptimizerPlanEnvironment benv =
                new OptimizerPlanEnvironment(
                        configuration, program.getUserCodeClassLoader(), parallelism);
        benv.setAsContext();
    
        //我们先只关注作为流式任务的StreamPlanEnvironment
        StreamPlanEnvironment senv =
                new StreamPlanEnvironment(
                        configuration, program.getUserCodeClassLoader(), parallelism);
    
        //设置上下文
        senv.setAsContext();

        try {
           //反射去调用用户的自定义jar包中,指定class的main方法 
            program.invokeInteractiveModeForExecution();
        } catch (Throwable t) {
            if (benv.getPipeline() != null) {
                return benv.getPipeline();
            }
            //问题: 这个pipeline是怎么拿到的? 为什么反射调用了user code jar,就可以得到了?请看下文!! 
            if (senv.getPipeline() != null) {
                return senv.getPipeline();
            }

            if (t instanceof ProgramInvocationException) {
                throw t;
            }

            throw generateException(
                    program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer);
        } finally {
            benv.unsetAsContext();
            senv.unsetAsContext();
            if (suppressOutput) {
                System.setOut(originalOut);
                System.setErr(originalErr);
            }
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }

        throw generateException(
                program,
                "The program plan could not be fetched - the program aborted pre-maturely.",
                null,
                stdOutBuffer,
                stdErrBuffer);
    }
上下文设置

我们可以看到,在去调用我们的用户自定义jar的main方法之前,首先进行了对StreamPlanEnvironment的上下文设置:

//我们先只关注作为流式任务的StreamPlanEnvironment
StreamPlanEnvironment senv =
        new StreamPlanEnvironment(
                configuration, program.getUserCodeClassLoader(), parallelism);
//深入这个,看看做了什么
senv.setAsContext();
org.apache.flink.client.program.StreamPlanEnvironment#setAsContext
public void setAsContext() {
        StreamExecutionEnvironmentFactory factory =
                conf -> {
                    this.configure(conf, getUserClassloader());
                    return this;
                };
        initializeContextEnvironment(factory);
    }
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#initializeContextEnvironment  
//重点注意,这里初始化的属性所在类是:org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
        contextEnvironmentFactory = ctx;
        threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
    }

这里要重点记住,这里设置的两个属性,是为了反射调用用户自定义jar main方法中最后的execute方法去挂钩

反射,用户code被调用

设置完毕后,就会去调用用户的code编写的jar包的main方法了,此时我们先忽视一个job的具体逻辑,只关注2部分:

获取上下文
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
我们跟进一下,看做了什么
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#initializeContextEnvironment  
//重点注意,这里初始化的属性所在类是:org.apache.flink.streaming.api.environment.StreamExecutionEnvironment  
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
                .map(factory -> factory.createExecutionEnvironment(configuration))
                .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
    }

你发现了什么吗? 这个类,和刚刚设置上下文的类,是一个类,而threadLocalContextEnvironmentFactory, contextEnvironmentFactory,是两个static属性:

private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;


private static final ThreadLocal
        threadLocalContextEnvironmentFactory = new ThreadLocal<>();

这能说明啥? 这说明此时我们的用户代码里获取的上下文,正是之前设置的上下文工厂所造出来的,而工厂得到的上下文也就是:StreamPlanEnvironment!!!
OK,这样能不能感觉到,就是因为这样,才能让部署时,对于Pipeline的生成与用户code挂上钩

execute

env.execute();
好,那么既然我们知道了,此时的env其实就是StreamPlanEnvironment,那直接看一下它的实现吧:

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute(java.lang.String) 
public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

        //获取stream graph,并调用execute
        return execute(getStreamGraph(jobName));
    }

继续跟进execute方法:

org.apache.flink.client.program.StreamContextEnvironment#execute

    @Override
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        //继续跟进这个方法:
        final JobClient jobClient = executeAsync(streamGraph);
        final List jobListeners = getJobListeners();

        try {
            final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
            return jobExecutionResult;
        } catch (Throwable t) {
            jobListeners.forEach(
                    jobListener ->
                            jobListener.onJobExecuted(
                                    null, ExceptionUtils.stripExecutionException(t)));
            ExceptionUtils.rethrowException(t);

            // never reached, only make javac happy
            return null;
        }
    }
org.apache.flink.client.program.StreamContextEnvironment#executeAsync


  @Override
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        validateAllowedExecution();
        //继续跟进这里
        final JobClient jobClient = super.executeAsync(streamGraph);

        if (!suppressSysout) {
            System.out.println("Job has been submitted with JobID " + jobClient.getJobID());
        }

        return jobClient;
    }
org.apache.flink.client.program.StreamPlanEnvironment#executeAsync

@Override
    public JobClient executeAsync(StreamGraph streamGraph) {
        //streamGraph 就是pipeline
        pipeline = streamGraph;

        // do not go on with anything now!
        throw new ProgramAbortException();
    }

之前我还很奇怪,明明在部署阶段应该只是获取任务流图而已,可如果按正常流程走,不就把job起起来了吗? 可是时机不对吧,到现在我才发现:原来是在获取了pipeline之后,通过异常的方式提前终止了任务继续提交!! 现在回头看创建pipeline那块:

 try {
           //反射去调用用户的自定义jar包中,指定class的main方法 
            program.invokeInteractiveModeForExecution();
        } catch (Throwable t) {
            if (benv.getPipeline() != null) {
                return benv.getPipeline();
            }
            //问题: 这个pipeline是怎么拿到的? 为什么反射调用了user code jar,就可以得到了?请看下文!! 
            if (senv.getPipeline() != null) {
                return senv.getPipeline();
            }

            if (t instanceof ProgramInvocationException) {
                throw t;
            }

            throw generateException(
                    program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer);
        } finally {
            benv.unsetAsContext();
            senv.unsetAsContext();
            if (suppressOutput) {
                System.setOut(originalOut);
                System.setErr(originalErr);
            }
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }

        throw generateException(
                program,
                "The program plan could not be fetched - the program aborted pre-maturely.",
                null,
                stdOutBuffer,
                stdErrBuffer);

原来早已经埋下了伏笔!! 这里通过异常来完成了流程控制,原来如此!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774834.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号