栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Seatunnel源码解析(3)-加载插件

Seatunnel源码解析(3)-加载插件

Seatunnel源码解析(3)-加载插件 需求

公司在使用Seatunnel的过程中,规划将Seatunnel集成在平台中,提供可视化操作。
因此目前有如下几个相关的需求:

    可以通过Web接口,传递参数,启动一个Seatunnel应用可以自定义日志,收集相关指标,目前想到的包括:应用的入流量、出流量;启动时间、结束时间等在任务结束后,可以用applicationId自动从yarn上收集日志(一是手动收集太麻烦,二是时间稍长日志就没了)
材料
    Seatunnel:2.0.5

目前官方2版本还没有正式发布,只能自己下载源码编译。
从Github下载官方源码,clone到本地Idea
github:https://github.com/apache/incubator-seatunnel
官方地址:http://seatunnel.incubator.apache.org/
Idea下方Terminal命令行里,maven打包,执行:mvn clean install -Dmaven.test.skip=true
打包过程大约十几分钟,执行结束后,seatunnel-dist模块的target目录下,可以找到打好包的*.tar.gz压缩安装包

    Spark:2.4.8Hadoop:2.7
任意门

Seatunnel源码解析(1)-启动应用
Seatunnel源码解析(2)-加载配置文件
Seatunnel源码解析(3)-加载插件
Seatunnel源码解析(4) -启动Spark/Flink程序
Seatunnel源码解析(5)-修改启动LOGO

导读

本章将从源码角度,解读Seatunnel如何根据配置,加载指定的插件代码

如何加载插件

entryPoint

public class Seatunnel {
    ...
    private static void entryPoint(String configFile, Engine engine) throws Exception {
        // 根据.conf配置文件的路径,加载解析配置,并封装成ConfigBuilder 
        ConfigBuilder configBuilder = new ConfigBuilder(configFile, engine);
        // 通过ConfigBuilder,加载配置文件中指定的Source、Transform、Sink插件
        List sources = configBuilder.createPlugins(PluginType.SOURCE);
        List transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
        List sinks = configBuilder.createPlugins(PluginType.SINK);
        //  通过ConfigBuilder,创建对应执行引擎(Spark/Flink)和执行模式(Batch/Flink)的执行器Execution
        Execution execution = configBuilder.createExecution();
        // 调用插件自定义的检查配置的逻辑
        baseCheckConfig(sources, transforms, sinks);
        // 调用插件自定义的插件执行的前置初始化逻辑
        prepare(configBuilder.getEnv(), sources, transforms, sinks);
        // 打印应用启动LOGO
        showAsciiLogo();
        // Execution提交Spark/Flink应用
        execution.start(sources, transforms, sinks);
    }
}

在上一节中,介绍了Seatunnel将配置文件内容,封装到ConfigBuilder中,顺带创建了Spark/Flink的上下文环境。
从entryPoint方法中,看出通过configBuilfer.createPlugins()函数创建Source、Transform、Sink插件

ConfigBuilder.createPlugins()

以构建Source插件为例,我们在第一节《启动一个应用》的配置文件中,指定Source插件为Fake

    首先从配置中,拿到所有source的配置遍历每个source插件的配置,生成对应的插件实例,并添加到List中
public class ConfigBuilder{
    ...
    public > List createPlugins(PluginType type) {
        ...
        // 以Source为例,则这里的type即为PluginType.Source
        List basePluginList = new ArrayList<>();
        // 拿到conf文件里,所有source的配置
        List configList = config.getConfigList(type.getType());
        configList.forEach(plugin -> {
            // 遍历每个source的配置
            try {
                // 这个t就是要构建的插件,即baseSource的子类
                T t = createPluginInstanceIgnoreCase(plugin.getString(PLUGIN_NAME_KEY), type);
                // 将这个插件的配置传递到插件中,方便在自定义插件的其他函数中使用
                t.setConfig(plugin);
                // 保存已经生成好的插件
                basePluginList.add(t);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });

        return basePluginList;
    }
    ...
}

ConfigBuilder.createPluginInstanceIgnoreCase()

在createPluginInstanceIgnoreCase中,创建一个具体的插件

参数是name:配置中的插件名称(Fake),pluginType:插件的类型(Source)判断是否为用户上传的自定义插件,判断标准是:插件名是否被.分割

    用户自定义上传的插件,则配置中的插件名称为全类名,直接通过反射创建,返回创建好的插件官方包自带插件,则配置中的插件名为插件类的类名,需要通过官方的包命名规则,拼接出全类名根据传入的PluginType,通过反射构建出所有的baseSource(以Source为例)实现类的实例遍历所有实现类,找到与通过配置文件插件名拼接的全类名相等的插件实例,并返回创建好的插件找不到则抛出异常
public class ConfigBuilder{
    ...
	private > T createPluginInstanceIgnoreCase(String name, PluginType pluginType) throws Exception {
        LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.name:" + name);
        LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.pluginType:" + pluginType.getType());
        // 函数参数即Fake,Source
        if (name.split("\.").length != 1) {
            // 这里判断是用户自定义的插件(非官方包自带插件)
            return (T) Class.forName(name).newInstance();
        }
        // 官方包自带插件生成方式
        String packageName;
        ServiceLoader plugins;
        switch (pluginType) {
            case SOURCE:
                // 根据前面解析load()中的代码,得知此处packageName是:
                // org.apache.seatunnel.spark.source
                packageName = configPackage.getSourcePackage();
                LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.packageName:" + packageName);
                // configPackage.getbaseSourceClass()=
                // org.apache.seatunnel.spark.source.baseSparkSource
                // T: baseSource
                Class baseSource = (Class) Class.forName(configPackage.getbaseSourceClass());
                plugins = ServiceLoader.load(baseSource);
                break;
            case TRANSFORM:
                packageName = configPackage.getTransformPackage();
                Class baseTransform = (Class) Class.forName(configPackage.getbaseTransformClass());
                plugins = ServiceLoader.load(baseTransform);
                break;
            case SINK:
                packageName = configPackage.getSinkPackage();
                Class baseSink = (Class) Class.forName(configPackage.getbaseSinkClass());
                plugins = ServiceLoader.load(baseSink);
                break;
            default:
                throw new IllegalArgumentException("PluginType not support : [" + pluginType + "]");
        }
        // canonicalName = org.apache.seatunnel.spark.source.Fake
        String canonicalName = packageName + "." + name;
        LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.canonicalName:" + canonicalName);
        for (Iterator it = plugins.iterator(); it.hasNext(); ) {
            try {
                // ServiceLoader.load()将baseSource所有的子类都load出来
                // 这里循环遍历每一个被加载出来的插件
                // 通过判断插件类名与配置中的插件名是否相等,判断是否为要被创建的插件,并返回,因此
                T plugin = it.next();
                Class serviceClass = plugin.getClass();
                String serviceClassName = serviceClass.getName();
                String clsNameToLower = serviceClassName.toLowerCase();
                LOGGER.info("self.ConfigBuilder.createPluginInstanceIgnoreCase.serviceClassName:" + serviceClassName);

                if (clsNameToLower.equals(canonicalName.toLowerCase())) {
                    return plugin;
                }
            } catch (ServiceConfigurationError e) {
                // Iterator.next() may throw ServiceConfigurationError,
                // but maybe caused by a not used plugin in this job
                LOGGER.warn("Error when load plugin: [{}]", canonicalName, e);
            }
        }
        throw new ClassNotFoundException("Plugin class not found by name :[" + canonicalName + "]");
    }
    ...
}

Source、Transform、Sink插件都是相同的构建过程

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773787.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号