栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Seatunnel源码解析(1)-启动应用

Seatunnel源码解析(1)-启动应用

Seatunnel源码解析(1)-启动应用 需求

公司在使用Seatunnel的过程中,规划将Seatunnel集成在平台中,提供可视化操作。
因此目前有如下几个相关的需求:

    可以通过Web接口,传递参数,启动一个Seatunnel应用可以自定义日志,收集相关指标,目前想到的包括:应用的入流量、出流量;启动时间、结束时间等在任务结束后,可以用applicationId自动从yarn上收集日志(一是手动收集太麻烦,二是时间稍长日志就没了)
材料
    Seatunnel:2.0.5

目前官方2版本还没有正式发布,只能自己下载源码编译。
从Github下载官方源码,clone到本地Idea
github:https://github.com/apache/incubator-seatunnel
官方地址:http://seatunnel.incubator.apache.org/
Idea下方Terminal命令行里,maven打包,执行:mvn clean install -Dmaven.test.skip=true
打包过程大约十几分钟,执行结束后,seatunnel-dist模块的target目录下,可以找到打好包的*.tar.gz压缩安装包

    Spark:2.4.8Hadoop:2.7
任意门

Seatunnel源码解析(1)-启动应用
Seatunnel源码解析(2)-加载配置文件
Seatunnel源码解析(3)-加载插件
Seatunnel源码解析(4) -启动Spark/Flink程序
Seatunnel源码解析(5)-修改启动LOGO

导读

本章将从一个具体的应用案例为出发点,解析Seatunnel启动一个应用的流程

配置应用文件

启动ST应用之前,需要先编写一个.conf的配置文件。
.conf文件的内容,包括env、source、transform、sink几部分,描述了数据的完整传递流程。
本文使用官方包提供好的示例文件"config/spark.batch.conf"

spark.batch.conf

env {
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

source {
  Fake {
    result_table_name = "my_dataset"
  }
}

transform {
}

sink {
  Console {}
}
启动应用
../bin/start-seatunnel-spark.sh 
 --master local 
 --deploy-mode client 
 --config example/spark.batch.conf
解析Shell启动脚本

官方的启动脚本,最后调用spark-submit命令,提交Spark应用程序。
可以看到提交的主类是:org.apache.seatunnel.SeatunnelSpark
可以初步判断SeatunnelSpark就是Seatunnel的主类入口

#!/bin/bash

...

exec ${SPARK_HOME}/bin/spark-submit --class org.apache.seatunnel.SeatunnelSpark 
    --name $(getAppName ${CONFIG_FILE}) 
    --jars $(echo ${LIB_DIR}/*.jar | tr ' ' ',') 
    --master ${MASTER} 
    --deploy-mode ${DEPLOY_MODE} 
    --driver-java-options "${clientModeDriverJavaOpts}" 
    --conf spark.executor.extraJavaOptions="${executorJavaOpts}" 
    --conf spark.driver.extraJavaOptions="${driverJavaOpts}" 
    ${sparkconf} 
    ${JarDepOpts} 
    ${FilesDepOpts} 
    ${assemblyJarName} ${CMD_ARGUMENTS}

在shell脚本中输出上面涉及到的变量,得到如下结果:

SPARK_HOME = /opt/spark
# spark-submit命令参数
CONFIG_FILE = config/spark.batch.conf
LIB_DIR = /opt/seatunnel/lib
MASTER = local
DEPLOY_MODE = client
clientModeDriverJavaOpts = 
executorJavaOpts = 
driverJavaOpts = 
# 主程序参数
sparkconf = --conf "spark.app.name=SeaTunnel" --conf "spark.executor.memory=1g" --conf "spark.executor.cores=1" --conf "spark.executor.instances=2"
JarDepOpts = 
FilesDepOpts = 
assemblyJarName = /opt/seatunnel/lib/seatunnel-core-spark.jar
CMD_ARGUMENTS = --master yarn --deploy-mode client --config example/spark.batch.conf
解析SeatunnelSpark应用主类

主类代码比较简单,先检查并封装配置,然后运行应用。

public class SeatunnelSpark {
    public static void main(String[] args) throws Exception {
	    //sparkconf = 
	    	// --conf "spark.app.name=SeaTunnel" 
	    	//--conf "spark.executor.memory=1g" 
	    	//--conf "spark.executor.cores=1" 
	    	//--conf "spark.executor.instances=2"
	    //JarDepOpts = 
	    //FilesDepOpts = 
	    //assemblyJarName = /opt/seatunnel/lib/seatunnel-core-spark.jar
	    //CMD_ARGUMENTS = 
	    	//--master yarn 
	    	//--deploy-mode client 
	    	//--config example/spark.batch.conf
        CommandLineArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
        Seatunnel.run(sparkArgs, SPARK);
    }
}

run()方法传入Spark枚举,Seatunnel目前主要提供了Spark和Flink两种执行引擎,所以这里枚举提供了Spark和Flink。

public enum Engine {
    SPARK("spark"), Flink("flink"), NULL("");
    ...
}

因此,类比Spark,Flink提供了类似的主类入口

public class SeatunnelFlink {
    public static void main(String[] args) throws Exception {
        CommandLineArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
        Seatunnel.run(flinkArgs, Flink);
    }
}

不管是Spark还是Flink,都会调用Seatunnel.run()

public class Seatunnel {
    ...
    public static void run(CommandLineArgs commandLineArgs, Engine engine) throws Exception {
        Common.setDeployMode(commandLineArgs.getDeployMode());
        String configFilePath = getConfigFilePath(commandLineArgs, engine);
        ...
        // 执行入口
        entryPoint(configFilePath, engine);
        ...
    }
    ...
}

重点关注entryPoint(configFilePath, engine)函数

    加载.conf文件内容,解析并封装成ConfigBuilder通过ConfigBuilder,加载配置文件中指定的Source、Transform、Sink插件通过ConfigBuilder,创建对应执行引擎(Spark/Flink)和执行模式(Batch/Flink)的执行器Execution检查上面创建好的每个Source、Transform、Sink插件的自定义配置,调用各自的checkConfig()方法初始化上面创建好的每个Source、Transform、Sink插件,调用插件自定义的prepare()方法打印应用启动LOGOExecution提交Spark/Flink应用

执行入口:entryPoint

public class Seatunnel {
    ...
    private static void entryPoint(String configFile, Engine engine) throws Exception {
        // 根据.conf配置文件的路径,加载解析配置,并封装成ConfigBuilder 
        ConfigBuilder configBuilder = new ConfigBuilder(configFile, engine);
        // 通过ConfigBuilder,加载配置文件中指定的Source、Transform、Sink插件
        List sources = configBuilder.createPlugins(PluginType.SOURCE);
        List transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
        List sinks = configBuilder.createPlugins(PluginType.SINK);
        //  通过ConfigBuilder,创建对应执行引擎(Spark/Flink)和执行模式(Batch/Flink)的执行器Execution
        Execution execution = configBuilder.createExecution();
        // 调用插件自定义的检查配置的逻辑
        baseCheckConfig(sources, transforms, sinks);
        // 调用插件自定义的插件执行的前置初始化逻辑
        prepare(configBuilder.getEnv(), sources, transforms, sinks);
        // 打印应用启动LOGO
        showAsciiLogo();
        // Execution提交Spark/Flink应用
        execution.start(sources, transforms, sinks);
    }
}
总结

通过上面执行流程的解析可以大致得到如下信息:

    Seatunnel目前支持Spark、Flink两种执行引擎使用Spark和Flink引擎,除入口脚本和入口主类不同外,后续的执行流程是相同的执行核心逻辑,主要集中在entryPoint()方法中
参考

https://www.cnblogs.com/javabg/p/8026881.html
https://blog.csdn.net/u013786868/article/details/80945721

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773758.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号