栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink sql 自定义connector 原理解读

flink sql 自定义connector 原理解读

文章目录
      • 创建动态表工厂

类比于flume的sink,source,channel的模型,定义好connector,用户编写自定义的source端和siink端,就可以将数据需求sql化

实现架构图:

创建动态表工厂

对于source端实现 DynamicTableSourceFactory 接口, DynamicTableSourceFactory 需要实现的方法

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        return null;
    }

对于 DynamicTableSinkFactory

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        return null;
    }

sink和source 都需要实现的公共方法

    @Override
    public String factoryIdentifier() {
        return null;
    }

    @Override
    public Set> requiredOptions() {
        return null;
    }

    @Override
    public Set> optionalOptions() {
        return null;
    }

flink 使用SPI机制加载 DynamicTableSourceFactory 和 DynamicTableSinkFactory 因此我们需要在resources/meta-INF/services定义org.apache.flink.table.factories.Factory 文件,内容为该实现类的全路径

  @Override
    public Set> requiredOptions() {
        return null;
    }

with里面必须要填写的属性配置

 @Override
    public Set> optionalOptions() {
        return null;
    }

with里面非必须填写属性配置

通常定义一个配置类,类名为xxOption,类似于枚举,在配置类中管理我们写connector中with的属性,table的option为ConfigOption类型

    public static final ConfigOption IP =
            ConfigOptions.key("ip")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("the redis's connect address.");

定义with 里面的 connector ,定义连接器,指定该tableSource的类型是什么

    @Override
    public String factoryIdentifier() {
        return "redis";
    }

重点来了,下游接口对接我们自定义的createDynamicTableSink 和 createDynamicTableSource 方法,可以使用内置的FactoryUtil.TableFactoryHelper 工具类来校验传入的参数,当然也可以自己编写校验逻辑,通过关联的上下文对象还能获取到表的元数据

     ReadableConfig tableOptions = helper.getOptions();

通过 tableOptions 来获取参数,定义,借助网友例子

  private void validateOptions(ReadableConfig options) {
    switch (options.get(MODE)) {
      case "single":
        if (StringUtils.isEmpty(options.get(SINGLE_HOST))) {
          throw new IllegalArgumentException("Parameter single.host must be provided in single mode");
        }
        break;
      case "cluster":
        if (StringUtils.isEmpty(options.get(CLUSTER_NODES))) {
          throw new IllegalArgumentException("Parameter cluster.nodes must be provided in cluster mode");
        }
        break;
      case "sentinel":
        if (StringUtils.isEmpty(options.get(SENTINEL_NODES)) || StringUtils.isEmpty(options.get(SENTINEL_MASTER))) {
          throw new IllegalArgumentException("Parameters sentinel.nodes and sentinel.master must be provided in sentinel mode");
        }
        break;
      default:
        throw new IllegalArgumentException("Invalid Redis mode. Must be single/cluster/sentinel");
    }
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584515.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号