分享一个项目flink-redis-connector,功能如下:
- 支持Flink SQL写Redis支持Flink SQL读Redis维表(高时效性&提供缓存,非定期全量load的all cache方式)
create table histalarmDim
(
metricKey varchar,
histalarmData ARRAY< varchar >
) with (
'connector' = 'redis',
'host' = '127.0.0.1',
'port' = '6379',
'redis-mode' = 'single',
'key-column' = 'metricKey',
'value-column' = 'histalarmData',
'lookup.hash.enable' = 'false',
'lookup.redis.datatype' = 'list'
);
注意:
'lookup.redis.datatype' = 'list' -- 维表使用下,数组情况需要指定redis实际数据类型 LIST SET SORTED_SET,其他忽略 缓存参数: cache.type -- heap 、 off-heap 支持堆内、堆外缓存方式 cache.max-rows -- 缓存数据量大小 cache.ttl -- 缓存失效时间二:sink表方式
create table sink_redis
(
username VARCHAR,
passport VARCHAR
) with (
'connector' = 'redis',
'host' = '127.0.0.1',
'port' = '6379',
'redis-mode' = 'single',
'key-column' = 'username',
'value-column' = 'passport',
'command' = 'set');
注意:
command参数是指对应的redis操作命令:
LPUSH(RedisDataType.LIST),
RPUSH(RedisDataType.LIST),
SADD(RedisDataType.SET),
SET(RedisDataType.STRING),
SETEX(RedisDataType.STRING),
PFADD(RedisDataType.HYPER_LOG_LOG),
PUBLISH(RedisDataType.PUBSUB),
ZADD(RedisDataType.SORTED_SET),
ZINCRBY(RedisDataType.SORTED_SET),
ZREM(RedisDataType.SORTED_SET),
HSET(RedisDataType.HASH),
HGET(RedisDataType.HASH),
HINCRBY(RedisDataType.HINCRBY),
INCRBY(RedisDataType.STRING),
INCRBY_EX(RedisDataType.STRING),
DECRBY(RedisDataType.STRING),
DESCRBY_EX(RedisDataType.STRING);
redis-mode 是指redis集群部署模式:
single sentinel cluster
具体可以阅读源码,学习了解。



