栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink维表查询redis之flink-connector-redis

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink维表查询redis之flink-connector-redis

插件名称:flink-connector-redis 插件地址:https://github.com/jeff-zou/flink-connector-redis.git 无法翻墙:https://gitee.com/jeff-zou/flink-connector-redis.git 项目介绍

基于bahir-flink二次开发,相对bahir增加的内容有:Table API, 维表查询。参考了腾讯云与阿里云两家主流云产商的流计算产品,取两家之长,并增加了更丰富的功能。

支持功能对应redis操作命令有:

插入维表查询
setget
hsethget
rpush lpush
incrBy decrBy hincrBy zincrby
sadd zadd pfadd(hyperloglog)
使用方法:

命令行执行 mvn package -DskipTests打包后,将生成的包flink-connector-redis_2.12-1.13.2.jar引入flink lib中即可,无需其它设置。

使用说明:

无需通过primary key来映射redis中的Key,直接由ddl中的字段顺序来决定Key,如:

create table sink_redis(username VARCHAR, passport VARCHAR)  with ('command'='set') 
其中username为key, passport为value.

create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR)  with ('command'='hset') 
其中name为map结构的key, subject为field, score为value.

with参数说明:

字段默认值类型说明
connector(none)Stringredis
host(none)StringRedis IP
port6379IntegerRedis 端口
passwordnullString如果没有设置,则为 null
database0Integer默认使用 db0
maxTotal2Integer最大连接数
maxIdle2Integer最大保持连接数
minIdle1Integer最小保持连接数
timeout2000Integer连接超时时间,单位 ms,默认 1s
cluster-nodes(none)String集群类型 ,当redis-mode为cluster时不为空,支持类型有:sentinels cluster
command(none)String对应上文中的redis命令
redis-mode(none)Integerredis类型: single cluster
lookup.cache.max-rows-1Integerlookup 缓存大小
lookup.cache.ttl-1Integer缓存过期时间
lookup.max-retries3Integerlookup 失败重试次数

其它参数请参考bahir

使用示例:

维表查询:

create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','command'='hset');

-- 先在redis中插入数据,相当于redis命令: hset 3 3 100 --
insert into sink_redis select * from (values ('3', '3', '100'));
                
create table dim_table (name varchar, level varchar, age varchar) with ('connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single', 'password'='*****','command'='hget', 'maxIdle'='2', 'minIdle'='1', 'lookup.cache.max-rows'='10', 'lookup.cache.ttl'='10', 'lookup.max-retries'='3');
    
-- 随机生成10以内的数据作为数据源 --
-- 其中有一条数据会是: username = 3  level = 3, 会跟上面插入的数据关联 -- 
create table source_table (username varchar, level varchar, proctime as procTime()) with ('connector'='datagen',  'rows-per-second'='1',  'fields.username.kind'='sequence',  'fields.username.start'='1',  'fields.username.end'='10', 'fields.level.kind'='sequence',  'fields.level.start'='1',  'fields.level.end'='10');

create table sink_table(username varchar, level varchar,age varchar) with ('connector'='print');

insert into
	sink_table
select
	s.username,
	s.level,
	d.age
from
	source_table s
left join dim_table for system_time as of s.proctime as d on
	d.name = s.username
	and d.level = s.level;
-- username为3那一行会关联到redisw值,输出为: 3,3,100	

DataStream查询方式

示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.datastream.DataStreamTest.java

hset示例,相当于redis命令:hset tom math 150

        Configuration configuration = new Configuration();
        configuration.setString(REDIS_MODE, REDIS_CLUSTER);
        configuration.setString(REDIS_COMMAND, RedisCommand.HSET.name());

        RedisSinkMapper redisMapper = (RedisSinkMapper)RedisHandlerServices
                .findRedisHandler(RedisMapperHandler.class, configuration.toMap())
                .createRedisMapper(configuration);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        GenericRowData genericRowData = new GenericRowData(3);
        genericRowData.setField(0, "tom");
        genericRowData.setField(1, "math");
        genericRowData.setField(2, "151");
        DataStream dataStream = env.fromElements(genericRowData);

       ResolvedSchema resolvedSchema = ResolvedSchema.physical(new String[]{"name", "subject", "score"}, new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING().notNull(), DataTypes.INT().notNull()});
        FlinkJedisConfigbase conf = getLocalRedisClusterConfig();
        RedisSinkFunction redisSinkFunction = new RedisSinkFunction<>(conf, redisMapper, resolvedSchema);

        dataStream.addSink(redisSinkFunction);
        env.execute("RedisSinkTest");

其它写入示例

示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLTest.java

set示例,相当于redis命令: set test test11

   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

        String ddl = "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " +
                "'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','command'='set')" ;

        tEnv.executeSql(ddl);
        String sql = " insert into sink_redis select * from (values ('test', 'test11'))";
        TableResult tableResult = tEnv.executeSql(sql);
        tableResult.getJobClient().get()
                .getJobExecutionResult()
                .get();
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760173.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号