- 用FlinkJedisPoolConfig设置redis的连接信息
- 数据流.addSink()里面new 一个 RedisSink
- 将redis配置和自定义类继承RedisMapper放入RedisSink
- 重写自定义类
// 定义FlinkJedisPoolConfig设置redis的连接信息
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
// addSink里传一个RedisSink
dataStream.addSink(new RedisSink[插入的数据类型](conf, new MyRedisMapper))
// 自定义类继承RedisMapper
class MyRedisMapper extends RedisMapper[插入的数据类型]{
// 定义保存数据的命令描述为“HSET” 和 哈希表名
override def getCommandDescription: RedisCommandDescription =
new RedisCommandDescription(RedisCommand.HSET, "哈希表名")
// 指定key值
override def getKeyFromData(t: 插入的数据类型): String = t.属性
// 指定value值
override def getValueFromData(t: 插入的数据类型): String = t.属性
}