材料公司在使用Seatunnel的过程中,规划将Seatunnel集成在平台中,提供可视化操作。
因此目前有如下几个相关的需求:可以通过Web接口,传递参数,启动一个Seatunnel应用可以自定义日志,收集相关指标,目前想到的包括:应用的入流量、出流量;启动时间、结束时间等在任务结束后,可以用applicationId自动从yarn上收集日志(一是手动收集太麻烦,二是时间稍长日志就没了)
- Seatunnel:2.0.5
目前官方2版本还没有正式发布,只能自己下载源码编译。
从Github下载官方源码,clone到本地Idea
github:https://github.com/apache/incubator-seatunnel
官方地址:http://seatunnel.incubator.apache.org/
Idea下方Terminal命令行里,maven打包,执行:mvn clean install -Dmaven.test.skip=true
打包过程大约十几分钟,执行结束后,seatunnel-dist模块的target目录下,可以找到打好包的*.tar.gz压缩安装包
- Spark:2.4.8Hadoop:2.7
导读Seatunnel源码解析(1)-启动应用
Seatunnel源码解析(2)-加载配置文件
Seatunnel源码解析(3)-加载插件
Seatunnel源码解析(4) -启动Spark/Flink程序
Seatunnel源码解析(5)-修改启动LOGO
执行Spark/Flink应用本章将从源码角度,解读Seatunnel如何将创建好的Source、Transform、Sink插件,组织成一个完整的Spark/Flink应用,并提交执行
entryPoint
public class Seatunnel {
...
private static void entryPoint(String configFile, Engine engine) throws Exception {
// 根据.conf配置文件的路径,加载解析配置,并封装成ConfigBuilder
ConfigBuilder configBuilder = new ConfigBuilder(configFile, engine);
// 通过ConfigBuilder,加载配置文件中指定的Source、Transform、Sink插件
List sources = configBuilder.createPlugins(PluginType.SOURCE);
List transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
List sinks = configBuilder.createPlugins(PluginType.SINK);
// 通过ConfigBuilder,创建对应执行引擎(Spark/Flink)和执行模式(Batch/Flink)的执行器Execution
Execution execution = configBuilder.createExecution();
// 调用插件自定义的检查配置的逻辑
baseCheckConfig(sources, transforms, sinks);
// 调用插件自定义的插件执行的前置初始化逻辑
prepare(configBuilder.getEnv(), sources, transforms, sinks);
// 打印应用启动LOGO
showAsciiLogo();
// Execution提交Spark/Flink应用
execution.start(sources, transforms, sinks);
}
}
Execuion接口
Execution接口,提供一个start()方法,用来启动一个具体的Seatunnel的job(Spark/Flink)
start()方法中定义了具体的source读取数据,transform转换数据,sink写数据的流程,即一个完整的的Spark/Flink代码流程
Execution接口,继承Plugin接口,有5个实现类
public interface Executionextends Plugin { void start(List sources, List transforms, List sinks) throws Exception; }
ConfigBuilder.createExecution()
在createExecution方法中,根据执行引擎和是否流模式,创建具体的Execution的实现类
public class ConfigBuilder {
public Execution createExecution() {
Execution execution = null;
switch (engine) {
case SPARK:
SparkEnvironment sparkEnvironment = (SparkEnvironment) env;
if (streaming) {
// SparkStreamingExecution:scala代码,实现Executin接口
execution = new SparkStreamingExecution(sparkEnvironment);
} else {
// SparkBatchExecution:java代码,实现Executin接口
execution = new SparkBatchExecution(sparkEnvironment);
}
break;
case Flink:
FlinkEnvironment flinkEnvironment = (FlinkEnvironment) env;
if (streaming) {
// FlinkStreamExecution:java代码,实现Executin接口
execution = new FlinkStreamExecution(flinkEnvironment);
} else {
// FlinkBatchExecution:java代码,实现Executin接口
execution = new FlinkBatchExecution(flinkEnvironment);
}
break;
default:
break;
}
return execution;
}
}
baseCheckConfig()
这个方法调用每个插件自定义的checkCofig()函数,在执行前,检查插件的必须配置
public class Seatunnel {
...
@SafeVarargs
private static void baseCheckConfig(List extends Plugin>... plugins) {
for (List extends Plugin> pluginList : plugins) {
for (Plugin plugin : pluginList) {
CheckResult checkResult;
try {
checkResult = plugin.checkConfig();
} catch (Exception e) {
checkResult = CheckResult.error(e.getMessage());
}
if (!checkResult.isSuccess()) {
LOGGER.error("Plugin[{}] contains invalid config, error: {} n", plugin.getClass().getName(), checkResult.getMsg());
System.exit(-1); // invalid configuration
}
}
}
deployModeCheck();
}
...
}
prepare()
这个方法,调用每个插件自定义的prepare()方法,可以做一些正式执行前的前置工作
public class Seatunnel {
...
private static void prepare(RuntimeEnv env, List extends Plugin>... plugins) {
for (List extends Plugin> pluginList : plugins) {
pluginList.forEach(plugin -> plugin.prepare(env));
}
}
...
}
Execution.start()
Execution调用start()方法,开启执行的具体的Seatunnel的job(Spark/Flink)
以SparkBatchExecution为例start的逻辑按照Source、Transform、Sink分为3层Source层:
遍历每个source插件,调用插件自身的getData方法,读数据,并在spark上下文中,注册成临时表
Transform层:
执行transform是,需要找到transform对接的上游DataSet取第一个Source插件的DataSet作为第一个Transform的默认DataSet,即数据流的起点如果配置文件中指定了上游DataSet的table name,则通过SparkSession直接读具体的临时表如果没有指定,上一个DataSet就是默认的上游,即按照配置文件的从上而下的顺序,依次连接transform代码里的ds变量,可以理解为数据流链表的尾节点每执行一个transform逻辑后,注册新的DataSet的临时表
Sink层
sink层与transform层逻辑类似如果配置文件中指定了上游的DataSet的table name,则通过SparkSession直接读具体的上游临时表如果没有指定,则传入的DataSet就是默认的上游DataSet,即按照配置文件从上而下的顺序,连接最后一个Transform形成的DataSet
public class SparkBatchExecution implements Execution{ private final SparkEnvironment environment; private Config config = ConfigFactory.empty(); public SparkBatchExecution(SparkEnvironment environment) { this.environment = environment; } public static void registerTempView(String tableName, Dataset ds) { ds.createOrReplaceTempView(tableName); } public static void registerInputTempView(baseSparkSource
> source, SparkEnvironment environment) { Config config = source.getConfig(); if (config.hasPath(RESULT_TABLE_NAME)) { // 调用source的getData方法读取数据,获取DataSet,并根据配置,注册临时表 String tableName = config.getString(RESULT_TABLE_NAME); registerTempView(tableName, source.getData(environment)); } else { throw new ConfigRuntimeException("Plugin[" + source.getClass().getName() + "] " + "must be registered as dataset/table, please set "" + RESULT_TABLE_NAME + "" config"); } } public static Dataset transformProcess(SparkEnvironment environment, baseSparkTransform transform, Dataset
ds) { Dataset
fromDs; Config config = transform.getConfig(); // 找到transform对接的上游DataSet // 如果配置文件中指定了上游DataSet的table name,则通过SparkSession直接读具体的临时表 // 如果没有指定,则传入的DataSet就是默认的上游,即按照配置文件的从上而下的顺序,依次连接transform if (config.hasPath(SOURCE_TABLE_NAME)) { String sourceTableName = config.getString(SOURCE_TABLE_NAME); fromDs = environment.getSparkSession().read().table(sourceTableName); } else { fromDs = ds; } // 执行transform的具体逻辑,返回DataSet return transform.process(fromDs, environment); } public static void registerTransformTempView(baseSparkTransform transform, Dataset
ds) { Config config = transform.getConfig(); if (config.hasPath(RESULT_TABLE_NAME)) { String resultTableName = config.getString(RESULT_TABLE_NAME); registerTempView(resultTableName, ds); } } public static void sinkProcess(SparkEnvironment environment, baseSparkSink> sink, Dataset
ds) { Dataset
fromDs; Config config = sink.getConfig(); // 与Transform的处理逻辑大致相同,需要找到sink对接的上游DataSet // 如果配置文件中指定了上游的DataSet的table name,则通过SparkSession直接读具体的临时表 // 如果没有指定,则传入的DataSet就是默认的上游,即按照配置文件从上而下的顺序,连接最后一个Transform形成的DataSet if (config.hasPath(SOURCE_TABLE_NAME)) { String sourceTableName = config.getString(SOURCE_TABLE_NAME); fromDs = environment.getSparkSession().read().table(sourceTableName); } else { fromDs = ds; } sink.output(fromDs, environment); } @Override public void start(List
sources, List transforms, List sinks) { // 遍历每个source插件,调用getData方法,读数据,并在spark上下文中,注册成临时表 sources.forEach(source -> registerInputTempView(source, environment)); if (!sources.isEmpty()) { // 这里取第一个插件的DataSet作为后面连接Transform的默认Source Dataset ds = sources.get(0).getData(environment); for (baseSparkTransform transform : transforms) { // takeAsList(int n):获取前n行数据,并以List的形式展现 // 遍历每个transform插件,判断上一个DataSet是否有数据,有则执行transform逻辑 // 这里有个疑问:如果有多个source,偏偏就第一个source没有数据,则所有的transform逻辑都不会执行 if (ds.takeAsList(1).size() > 0) { // 执行transforma的处理逻辑,用新的DataSet替换旧的ds ds = transformProcess(environment, transform, ds); // 注册临时表 registerTransformTempView(transform, ds); } } // 同样的方式,开始执行sink逻辑 for (SparkBatchSink sink : sinks) { sinkProcess(environment, sink, ds); } } } @Override public void setConfig(Config config) { this.config = config; } @Override public Config getConfig() { return this.config; } @Override public CheckResult checkConfig() { return CheckResult.success(); } @Override public void prepare(Void prepareEnv) { } }
疑问
如果有多个source,偏偏就第一个source没有数据,则按照代码transform部分的执行规则,后面所有的transform逻辑都不会执行



