材料公司在使用Seatunnel的过程中,规划将Seatunnel集成在平台中,提供可视化操作。
因此目前有如下几个相关的需求:可以通过Web接口,传递参数,启动一个Seatunnel应用可以自定义日志,收集相关指标,目前想到的包括:应用的入流量、出流量;启动时间、结束时间等在任务结束后,可以用applicationId自动从yarn上收集日志(一是手动收集太麻烦,二是时间稍长日志就没了)
- Seatunnel:2.0.5
目前官方2版本还没有正式发布,只能自己下载源码编译。
从Github下载官方源码,clone到本地Idea
github:https://github.com/apache/incubator-seatunnel
官方地址:http://seatunnel.incubator.apache.org/
Idea下方Terminal命令行里,maven打包,执行:mvn clean install -Dmaven.test.skip=true
打包过程大约十几分钟,执行结束后,seatunnel-dist模块的target目录下,可以找到打好包的*.tar.gz压缩安装包
- Spark:2.4.8Hadoop:2.7
导读Seatunnel源码解析(1)-启动应用
Seatunnel源码解析(2)-加载配置文件
Seatunnel源码解析(3)-加载插件
Seatunnel源码解析(4) -启动Spark/Flink程序
Seatunnel源码解析(5)-修改启动LOGO
如何加载插件本章将从源码角度,解读Seatunnel如何根据配置,加载指定的插件代码
entryPoint
public class Seatunnel {
...
private static void entryPoint(String configFile, Engine engine) throws Exception {
// 根据.conf配置文件的路径,加载解析配置,并封装成ConfigBuilder
ConfigBuilder configBuilder = new ConfigBuilder(configFile, engine);
// 通过ConfigBuilder,加载配置文件中指定的Source、Transform、Sink插件
List sources = configBuilder.createPlugins(PluginType.SOURCE);
List transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
List sinks = configBuilder.createPlugins(PluginType.SINK);
// 通过ConfigBuilder,创建对应执行引擎(Spark/Flink)和执行模式(Batch/Flink)的执行器Execution
Execution execution = configBuilder.createExecution();
// 调用插件自定义的检查配置的逻辑
baseCheckConfig(sources, transforms, sinks);
// 调用插件自定义的插件执行的前置初始化逻辑
prepare(configBuilder.getEnv(), sources, transforms, sinks);
// 打印应用启动LOGO
showAsciiLogo();
// Execution提交Spark/Flink应用
execution.start(sources, transforms, sinks);
}
}
在上一节中,介绍了Seatunnel将配置文件内容,封装到ConfigBuilder中,顺带创建了Spark/Flink的上下文环境。
从entryPoint方法中,看出通过configBuilfer.createPlugins()函数创建Source、Transform、Sink插件
ConfigBuilder.createPlugins()
以构建Source插件为例,我们在第一节《启动一个应用》的配置文件中,指定Source插件为Fake
首先从配置中,拿到所有source的配置遍历每个source插件的配置,生成对应的插件实例,并添加到List中
public class ConfigBuilder{
...
public > List createPlugins(PluginType type) {
...
// 以Source为例,则这里的type即为PluginType.Source
List basePluginList = new ArrayList<>();
// 拿到conf文件里,所有source的配置
List extends Config> configList = config.getConfigList(type.getType());
configList.forEach(plugin -> {
// 遍历每个source的配置
try {
// 这个t就是要构建的插件,即baseSource的子类
T t = createPluginInstanceIgnoreCase(plugin.getString(PLUGIN_NAME_KEY), type);
// 将这个插件的配置传递到插件中,方便在自定义插件的其他函数中使用
t.setConfig(plugin);
// 保存已经生成好的插件
basePluginList.add(t);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return basePluginList;
}
...
}
ConfigBuilder.createPluginInstanceIgnoreCase()
在createPluginInstanceIgnoreCase中,创建一个具体的插件
参数是name:配置中的插件名称(Fake),pluginType:插件的类型(Source)判断是否为用户上传的自定义插件,判断标准是:插件名是否被.分割
用户自定义上传的插件,则配置中的插件名称为全类名,直接通过反射创建,返回创建好的插件官方包自带插件,则配置中的插件名为插件类的类名,需要通过官方的包命名规则,拼接出全类名根据传入的PluginType,通过反射构建出所有的baseSource(以Source为例)实现类的实例遍历所有实现类,找到与通过配置文件插件名拼接的全类名相等的插件实例,并返回创建好的插件找不到则抛出异常
public class ConfigBuilder{
...
private > T createPluginInstanceIgnoreCase(String name, PluginType pluginType) throws Exception {
LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.name:" + name);
LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.pluginType:" + pluginType.getType());
// 函数参数即Fake,Source
if (name.split("\.").length != 1) {
// 这里判断是用户自定义的插件(非官方包自带插件)
return (T) Class.forName(name).newInstance();
}
// 官方包自带插件生成方式
String packageName;
ServiceLoader plugins;
switch (pluginType) {
case SOURCE:
// 根据前面解析load()中的代码,得知此处packageName是:
// org.apache.seatunnel.spark.source
packageName = configPackage.getSourcePackage();
LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.packageName:" + packageName);
// configPackage.getbaseSourceClass()=
// org.apache.seatunnel.spark.source.baseSparkSource
// T: baseSource
Class baseSource = (Class) Class.forName(configPackage.getbaseSourceClass());
plugins = ServiceLoader.load(baseSource);
break;
case TRANSFORM:
packageName = configPackage.getTransformPackage();
Class baseTransform = (Class) Class.forName(configPackage.getbaseTransformClass());
plugins = ServiceLoader.load(baseTransform);
break;
case SINK:
packageName = configPackage.getSinkPackage();
Class baseSink = (Class) Class.forName(configPackage.getbaseSinkClass());
plugins = ServiceLoader.load(baseSink);
break;
default:
throw new IllegalArgumentException("PluginType not support : [" + pluginType + "]");
}
// canonicalName = org.apache.seatunnel.spark.source.Fake
String canonicalName = packageName + "." + name;
LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.canonicalName:" + canonicalName);
for (Iterator it = plugins.iterator(); it.hasNext(); ) {
try {
// ServiceLoader.load()将baseSource所有的子类都load出来
// 这里循环遍历每一个被加载出来的插件
// 通过判断插件类名与配置中的插件名是否相等,判断是否为要被创建的插件,并返回,因此
T plugin = it.next();
Class> serviceClass = plugin.getClass();
String serviceClassName = serviceClass.getName();
String clsNameToLower = serviceClassName.toLowerCase();
LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.serviceClassName:" + serviceClassName);
if (clsNameToLower.equals(canonicalName.toLowerCase())) {
return plugin;
}
} catch (ServiceConfigurationError e) {
// Iterator.next() may throw ServiceConfigurationError,
// but maybe caused by a not used plugin in this job
LOGGER.warn("Error when load plugin: [{}]", canonicalName, e);
}
}
throw new ClassNotFoundException("Plugin class not found by name :[" + canonicalName + "]");
}
...
}
Source、Transform、Sink插件都是相同的构建过程



