- 创建动态表工厂
类比于flume的sink,source,channel的模型,定义好connector,用户编写自定义的source端和siink端,就可以将数据需求sql化
实现架构图:
创建动态表工厂对于source端实现 DynamicTableSourceFactory 接口, DynamicTableSourceFactory 需要实现的方法
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
return null;
}
对于 DynamicTableSinkFactory
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
return null;
}
sink和source 都需要实现的公共方法
@Override
public String factoryIdentifier() {
return null;
}
@Override
public Set> requiredOptions() {
return null;
}
@Override
public Set> optionalOptions() {
return null;
}
flink 使用SPI机制加载 DynamicTableSourceFactory 和 DynamicTableSinkFactory 因此我们需要在resources/meta-INF/services定义org.apache.flink.table.factories.Factory 文件,内容为该实现类的全路径
@Override
public Set> requiredOptions() {
return null;
}
with里面必须要填写的属性配置
@Override
public Set> optionalOptions() {
return null;
}
with里面非必须填写属性配置
通常定义一个配置类,类名为xxOption,类似于枚举,在配置类中管理我们写connector中with的属性,table的option为ConfigOption
public static final ConfigOptionIP = ConfigOptions.key("ip") .stringType() .noDefaultValue() .withDescription("the redis's connect address.");
定义with 里面的 connector ,定义连接器,指定该tableSource的类型是什么
@Override
public String factoryIdentifier() {
return "redis";
}
重点来了,下游接口对接我们自定义的createDynamicTableSink 和 createDynamicTableSource 方法,可以使用内置的FactoryUtil.TableFactoryHelper 工具类来校验传入的参数,当然也可以自己编写校验逻辑,通过关联的上下文对象还能获取到表的元数据
ReadableConfig tableOptions = helper.getOptions();
通过 tableOptions 来获取参数,定义,借助网友例子
private void validateOptions(ReadableConfig options) {
switch (options.get(MODE)) {
case "single":
if (StringUtils.isEmpty(options.get(SINGLE_HOST))) {
throw new IllegalArgumentException("Parameter single.host must be provided in single mode");
}
break;
case "cluster":
if (StringUtils.isEmpty(options.get(CLUSTER_NODES))) {
throw new IllegalArgumentException("Parameter cluster.nodes must be provided in cluster mode");
}
break;
case "sentinel":
if (StringUtils.isEmpty(options.get(SENTINEL_NODES)) || StringUtils.isEmpty(options.get(SENTINEL_MASTER))) {
throw new IllegalArgumentException("Parameters sentinel.nodes and sentinel.master must be provided in sentinel mode");
}
break;
default:
throw new IllegalArgumentException("Invalid Redis mode. Must be single/cluster/sentinel");
}
}



