本文参考自实战自定义Flink SQL Connector(w/ Flink 1.11 & Redis)
1. 目录结构 2. pom3. 代码 3.1 Constants4.0.0 com.xxx FlinkRedisDynamicTable1.0-SNAPSHOT 8 8 1.13.2 3.7.0 31.0.1-jre 1.18.22 3.2.0 org.apache.flink flink-table-api-java${flink.version} redis.clients jedis${redis.version} compile org.apache.bahir flink-connector-redis_2.111.0 compile org.apache.flink flink-table-planner_2.11${flink.version} provided com.google.guava guava${guava.version} provided
package com.xxx.cn.common;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import redis.clients.jedis.Protocol;
public class Constants {
public static final String IDENTIFIER = "redis";
public static final String SEPARATOR = ",";
public static final String HOST_SEPARATOR = ":";
public final static int REDIS_GET_COLUMN_MAX_COUNT = 2;
public final static int REDIS_HGET_COLUMN_MAX_COUNT = 3;
public static final ConfigOption MODE = ConfigOptions
.key("mode")
.stringType()
.defaultValue("single");
public static final ConfigOption SINGLE_HOST = ConfigOptions
.key("single.host")
.stringType()
.defaultValue(Protocol.DEFAULT_HOST);
public static final ConfigOption SINGLE_PORT = ConfigOptions
.key("single.port")
.intType()
.defaultValue(Protocol.DEFAULT_PORT);
public static final ConfigOption CLUSTER_NODES = ConfigOptions
.key("cluster.nodes")
.stringType()
.noDefaultValue();
public static final ConfigOption SENTINEL_NODES = ConfigOptions
.key("sentinel.nodes")
.stringType()
.noDefaultValue();
public static final ConfigOption SENTINEL_MASTER = ConfigOptions
.key("sentinel.master")
.stringType()
.noDefaultValue();
public static final ConfigOption PASSWORD = ConfigOptions
.key("password")
.stringType()
.noDefaultValue();
public static final ConfigOption COMMAND = ConfigOptions
.key("command")
.stringType()
.noDefaultValue();
public static final ConfigOption DB_NUM = ConfigOptions
.key("db-num")
.intType()
.defaultValue(Protocol.DEFAULT_DATAbase);
public static final ConfigOption TTL_SEC = ConfigOptions
.key("ttl-sec")
.intType()
.noDefaultValue();
public static final ConfigOption CONNECTION_TIMEOUT_MS = ConfigOptions
.key("connection.timeout-ms")
.intType()
.defaultValue(Protocol.DEFAULT_TIMEOUT);
public static final ConfigOption CONNECTION_MAX_TOTAL = ConfigOptions
.key("connection.max-total")
.intType()
.defaultValue(GenericObjectPoolConfig.DEFAULT_MAX_TOTAL);
public static final ConfigOption CONNECTION_MAX_IDLE = ConfigOptions
.key("connection.max-idle")
.intType()
.defaultValue(GenericObjectPoolConfig.DEFAULT_MAX_IDLE);
public static final ConfigOption CONNECTION_MIN_IDLE = ConfigOptions
.key("connection.min-idle")
.intType()
.defaultValue(GenericObjectPoolConfig.DEFAULT_MIN_IDLE);
public static final ConfigOption CONNECTION_TEST_ON_BORROW = ConfigOptions
.key("connection.test-on-borrow")
.booleanType()
.defaultValue(GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW);
public static final ConfigOption CONNECTION_TEST_ON_RETURN = ConfigOptions
.key("connection.test-on-return")
.booleanType()
.defaultValue(GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN);
public static final ConfigOption CONNECTION_TEST_WHILE_IDLE = ConfigOptions
.key("connection.test-while-idle")
.booleanType()
.defaultValue(GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE);
public static final ConfigOption HASH_ADDITIONAL_KEY = ConfigOptions
.key("hash.additional-key")
.stringType()
.noDefaultValue();
public static final ConfigOption LOOKUP_CACHE_MAX_ROWS = ConfigOptions
.key("lookup.cache.max-rows")
.intType()
.defaultValue(-1);
public static final ConfigOption LOOKUP_CACHE_TTL_SEC = ConfigOptions
.key("lookup.cache.ttl-sec")
.intType()
.defaultValue(-1);
}
3.2 RedisCommandsContainerPlus
package com.xxx.cn.container;
import java.io.IOException;
public interface RedisCommandsContainerPlus {
void open() throws Exception;
String get(String key);
String hget(String key, String hashField);
void close() throws IOException;
}
3.3 RedisContainerPlus
package com.xxx.cn.container;
import org.apache.flink.streaming.connectors.redis.common.container.RedisContainer;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSentinelPool;
import java.io.Closeable;
public class RedisContainerPlus extends RedisContainer implements RedisCommandsContainerPlus, Closeable{
private static final long serialVersionUID = 1L;
private transient JedisPool jedisPool;
private transient JedisSentinelPool jedisSentinelPool;
public RedisContainerPlus(JedisPool jedisPool) {
super(jedisPool);
this.jedisPool = jedisPool;
this.jedisSentinelPool = null;
}
public RedisContainerPlus(JedisSentinelPool sentinelPool) {
super(sentinelPool);
this.jedisPool = null;
this.jedisSentinelPool = sentinelPool;
}
@Override
public String get(String key) {
String value;
Jedis jedis = null;
try {
jedis = getInstance();
value = jedis.get(key);
} catch (Exception e) {
throw e;
} finally {
releaseInstance(jedis);
}
return value;
}
@Override
public String hget(String key, String hashField) {
String value;
Jedis jedis = null;
try {
jedis = getInstance();
value = jedis.hget(key, hashField);
} catch (Exception e) {
throw e;
} finally {
releaseInstance(jedis);
}
return value;
}
private Jedis getInstance() {
if (jedisSentinelPool != null) {
return jedisSentinelPool.getResource();
} else {
return jedisPool.getResource();
}
}
private void releaseInstance(final Jedis jedis) {
if (jedis == null) {
return;
}
try {
jedis.close();
} catch (Exception e) {
}
}
}
3.4 RedisClusterContainerPlus
package com.xxx.cn.container;
import org.apache.flink.streaming.connectors.redis.common.container.RedisClusterContainer;
import redis.clients.jedis.JedisCluster;
public class RedisClusterContainerPlus extends RedisClusterContainer implements RedisCommandsContainerPlus{
private transient JedisCluster jedisCluster;
public RedisClusterContainerPlus(JedisCluster jedisCluster) {
super(jedisCluster);
this.jedisCluster = jedisCluster;
}
@Override
public String get(String key) {
try {
return jedisCluster.get(key);
} catch (Exception e) {
throw e;
}
}
@Override
public String hget(String key, String hashField) {
try {
return jedisCluster.hget(key, hashField);
} catch (Exception e) {
throw e;
}
}
}
3.5 RedisCommandsContainerPlusBuilder
package com.xxx.cn.container;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigbase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSentinelPool;
import java.util.Objects;
public class RedisCommandsContainerPlusBuilder {
public static RedisCommandsContainerPlus build(FlinkJedisConfigbase flinkJedisConfigbase){
if(flinkJedisConfigbase instanceof FlinkJedisPoolConfig){
FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigbase;
return RedisCommandsContainerPlusBuilder.build(flinkJedisPoolConfig);
} else if (flinkJedisConfigbase instanceof FlinkJedisClusterConfig) {
FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigbase;
return RedisCommandsContainerPlusBuilder.build(flinkJedisClusterConfig);
} else if (flinkJedisConfigbase instanceof FlinkJedisSentinelConfig) {
FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigbase;
return RedisCommandsContainerPlusBuilder.build(flinkJedisSentinelConfig);
} else {
throw new IllegalArgumentException("Jedis configuration not found");
}
}
public static RedisCommandsContainerPlus build(FlinkJedisPoolConfig jedisPoolConfig) {
Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
jedisPoolConfig.getDatabase());
return new RedisContainerPlus(jedisPool);
}
public static RedisCommandsContainerPlus build(FlinkJedisClusterConfig jedisClusterConfig) {
Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
return new RedisClusterContainerPlus(jedisCluster);
}
public static RedisCommandsContainerPlus build(FlinkJedisSentinelConfig jedisSentinelConfig) {
Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
return new RedisContainerPlus(jedisSentinelPool);
}
}
3.6 RedisDynamicTableSink
package com.xxx.cn.sink;
import com.xxx.cn.utils.FlinkJedisConfigUtil;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigbase;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.RowKind;
import java.util.List;
import java.util.Optional;
import static com.xxx.cn.common.Constants.*;
public class RedisDynamicTableSink implements DynamicTableSink {
private final ReadableConfig options;
private final ResolvedSchema schema;
public RedisDynamicTableSink(ReadableConfig options, ResolvedSchema schema) {
this.options = options;
this.schema = schema;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATe_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Preconditions.checkNotNull(options, "No options supplied");
FlinkJedisConfigbase jedisConfig = FlinkJedisConfigUtil.getFlinkJedisConfig(options);
Preconditions.checkNotNull(jedisConfig, "No Jedis config supplied");
RedisCommand command = RedisCommand.valueOf(options.get(COMMAND).toUpperCase());
int fieldCount = schema.getColumnCount();
if (fieldCount != (needAdditionalKey(command) ? REDIS_HGET_COLUMN_MAX_COUNT : REDIS_GET_COLUMN_MAX_COUNT)) {
throw new ValidationException("Redis sink only supports 2 or 3 columns");
}
List columnDataTypes = schema.getColumnDataTypes();
for (int i = 0; i < fieldCount; i++) {
if (!columnDataTypes.get(i).getLogicalType().getTypeRoot().equals(LogicalTypeRoot.VARCHAR)) {
throw new ValidationException("Redis connector only supports STRING type");
}
}
RedisMapper mapper = new RedisRowDataMapper(options, command);
RedisSink redisSink = new RedisSink<>(jedisConfig, mapper);
return SinkFunctionProvider.of(redisSink);
}
@Override
public DynamicTableSink copy() {
return new RedisDynamicTableSink(options, schema);
}
@Override
public String asSummaryString() {
return "Redis Dynamic Table Sink";
}
private static boolean needAdditionalKey(RedisCommand command) {
return command.getRedisDataType() == RedisDataType.HASH || command.getRedisDataType() == RedisDataType.SORTED_SET;
}
public static final class RedisRowDataMapper implements RedisMapper {
private static final long serialVersionUID = 1L;
private final ReadableConfig options;
private final RedisCommand command;
private final String additionalKey;
public RedisRowDataMapper(ReadableConfig options, RedisCommand command) {
this.options = options;
this.command = command;
this.additionalKey = options.get(HASH_ADDITIONAL_KEY);
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(command, additionalKey);
}
@Override
public String getKeyFromData(RowData data) {
return data.getString(needAdditionalKey(command) ? 1 : 0).toString();
}
@Override
public String getValueFromData(RowData data) {
return data.getString(needAdditionalKey(command) ? 2 : 1).toString();
}
public Optional getAdditionalKey(RowData data) {
return needAdditionalKey(command) ? Optional.of(data.getString(0).toString()) : Optional.empty();
}
public Optional getAdditionalTTL(RowData data) {
return options.getOptional(TTL_SEC);
}
}
}
3.7 RedisRowDataLookupFunction
package com.xxx.cn.source;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import java.util.List;
import static com.xxx.cn.common.Constants.REDIS_GET_COLUMN_MAX_COUNT;
public class RedisDynamicTableSource implements LookupTableSource {
private final ReadableConfig options;
private final ResolvedSchema schema;
public RedisDynamicTableSource(ReadableConfig options, ResolvedSchema schema) {
this.options = options;
this.schema = schema;
}
@Override
public DynamicTableSource copy() {
return new RedisDynamicTableSource(options, schema);
}
@Override
public String asSummaryString() {
return "Redis Dynamic Table Source";
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
Preconditions.checkArgument(context.getKeys().length == 1 && context.getKeys()[0].length == 1, "Redis source only supports lookup by single key");
int columnCount = schema.getColumnCount();
if (columnCount != REDIS_GET_COLUMN_MAX_COUNT) {
throw new ValidationException("Redis source only supports 2 columns");
}
List columnDataTypes = schema.getColumnDataTypes();
for (int i = 0; i < columnCount; i++) {
if (!columnDataTypes.get(i).getLogicalType().getTypeRoot().equals(LogicalTypeRoot.VARCHAR)) {
throw new ValidationException("Redis connector only supports STRING type");
}
}
return TableFunctionProvider.of(new RedisRowDataLookupFunction(options));
}
}
3.8 DynamicTableSource
package com.xxx.cn.source;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import java.util.List;
import static com.xxx.cn.common.Constants.REDIS_GET_COLUMN_MAX_COUNT;
public class RedisDynamicTableSource implements LookupTableSource {
private final ReadableConfig options;
private final ResolvedSchema schema;
public RedisDynamicTableSource(ReadableConfig options, ResolvedSchema schema) {
this.options = options;
this.schema = schema;
}
@Override
public DynamicTableSource copy() {
return new RedisDynamicTableSource(options, schema);
}
@Override
public String asSummaryString() {
return "Redis Dynamic Table Source";
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
Preconditions.checkArgument(context.getKeys().length == 1 && context.getKeys()[0].length == 1, "Redis source only supports lookup by single key");
int columnCount = schema.getColumnCount();
if (columnCount != REDIS_GET_COLUMN_MAX_COUNT) {
throw new ValidationException("Redis source only supports 2 columns");
}
List columnDataTypes = schema.getColumnDataTypes();
for (int i = 0; i < columnCount; i++) {
if (!columnDataTypes.get(i).getLogicalType().getTypeRoot().equals(LogicalTypeRoot.VARCHAR)) {
throw new ValidationException("Redis connector only supports STRING type");
}
}
return TableFunctionProvider.of(new RedisRowDataLookupFunction(options));
}
}
3.9 FlinkJedisConfigUtil
package com.xxx.cn.utils;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigbase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import static com.xxx.cn.common.Constants.*;
public class FlinkJedisConfigUtil {
public static FlinkJedisConfigbase getFlinkJedisConfig(ReadableConfig options){
FlinkJedisConfigbase jedisConfig;
switch (options.get(MODE)) {
case "single":
jedisConfig = new FlinkJedisPoolConfig.Builder()
.setHost(options.get(SINGLE_HOST))
.setDatabase(options.get(DB_NUM))
.setPort(options.get(SINGLE_PORT))
.setPassword(options.get(PASSWORD))
.setMaxIdle(options.get(CONNECTION_MAX_IDLE))
.setMaxTotal(options.get(CONNECTION_MAX_TOTAL))
.setMinIdle(options.get(CONNECTION_MIN_IDLE))
.setTimeout(options.get(CONNECTION_TIMEOUT_MS))
.build();
break;
case "cluster":
Set cluster_nodes = Arrays.stream(options.get(CLUSTER_NODES)
.trim().split(SEPARATOR))
.map(node -> {
String[] split = node.split(HOST_SEPARATOR);
String ip = split[0];
int port = Integer.parseInt(split[1]);
return new InetSocketAddress(ip, port);
})
.collect(Collectors.toSet());
jedisConfig = new FlinkJedisClusterConfig.Builder()
.setNodes(cluster_nodes)
.setMaxIdle(options.get(CONNECTION_MAX_IDLE))
.setMaxTotal(options.get(CONNECTION_MAX_TOTAL))
.setMinIdle(options.get(CONNECTION_MIN_IDLE))
.setTimeout(options.get(CONNECTION_TIMEOUT_MS))
.build();
break;
case "sentinel":
Set sentinel_nodes = Arrays.stream(options.get(SENTINEL_NODES)
.trim().split(SEPARATOR))
.collect(Collectors.toSet());
jedisConfig = new FlinkJedisSentinelConfig.Builder()
.setMasterName(options.get(SENTINEL_MASTER))
.setSentinels(sentinel_nodes)
.setDatabase(options.get(DB_NUM))
.setPassword(options.get(PASSWORD))
.setMaxIdle(options.get(CONNECTION_MAX_IDLE))
.setMaxTotal(options.get(CONNECTION_MAX_TOTAL))
.setMinIdle(options.get(CONNECTION_MIN_IDLE))
.setConnectionTimeout(options.get(CONNECTION_TIMEOUT_MS))
.build();
break;
default:
throw new IllegalArgumentException("Invalid Redis mode. Must be single/cluster/sentinel");
}
return jedisConfig;
}
}
3.10 RedisDynamicTableFactory
package com.xxx.cn;
import com.xxx.cn.sink.RedisDynamicTableSink;
import com.xxx.cn.source.RedisDynamicTableSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import static com.xxx.cn.common.Constants.*;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
public class RedisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
validateOptions(options);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
return new RedisDynamicTableSink(options, schema);
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig options = helper.getOptions();
validateOptions(options);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
return new RedisDynamicTableSource(options, schema);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set> requiredOptions() {
Set> requiredOptions = new HashSet<>();
requiredOptions.add(MODE);
requiredOptions.add(COMMAND);
return requiredOptions;
}
@Override
public Set> optionalOptions() {
Set> optionalOptions = new HashSet<>();
optionalOptions.add(SINGLE_HOST);
optionalOptions.add(SINGLE_PORT);
optionalOptions.add(CLUSTER_NODES);
optionalOptions.add(SENTINEL_NODES);
optionalOptions.add(SENTINEL_MASTER);
optionalOptions.add(PASSWORD);
optionalOptions.add(DB_NUM);
optionalOptions.add(TTL_SEC);
optionalOptions.add(CONNECTION_TIMEOUT_MS);
optionalOptions.add(CONNECTION_MAX_TOTAL);
optionalOptions.add(CONNECTION_MAX_IDLE);
optionalOptions.add(CONNECTION_MIN_IDLE);
optionalOptions.add(CONNECTION_TEST_ON_BORROW);
optionalOptions.add(CONNECTION_TEST_ON_RETURN);
optionalOptions.add(CONNECTION_TEST_WHILE_IDLE);
optionalOptions.add(HASH_ADDITIONAL_KEY);
optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
optionalOptions.add(LOOKUP_CACHE_TTL_SEC);
return optionalOptions;
}
private void validateOptions(ReadableConfig options) {
switch (options.get(MODE)) {
case "single":
if (StringUtils.isEmpty(options.get(SINGLE_HOST))) {
throw new IllegalArgumentException("Parameter single.host must be provided in single mode");
}
break;
case "cluster":
if (StringUtils.isEmpty(options.get(CLUSTER_NODES))) {
throw new IllegalArgumentException("Parameter cluster.nodes must be provided in cluster mode");
}
break;
case "sentinel":
if (StringUtils.isEmpty(options.get(SENTINEL_NODES)) || StringUtils.isEmpty(options.get(SENTINEL_MASTER))) {
throw new IllegalArgumentException("Parameters sentinel.nodes and sentinel.master must be provided in sentinel mode");
}
break;
default:
throw new IllegalArgumentException("Invalid Redis mode. Must be single/cluster/sentinel");
}
}
}



