引入依赖编写redis配置自定义RedisMapper完整代码
引入依赖编写redis配置org.apache.flink flink-streaming-scala_2.11 1.10.2 org.apache.flink flink-scala_2.11 1.10.2 org.apache.bahir flink-connector-redis_2.11 1.0
val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
自定义RedisMapper
这里面的方法根据方法名应该都能看出来是什么意思
class MyRedisMapper extends RedisMapper[SensorReading]{
// 写入redis的命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
}
override def getKeyFromData(t: SensorReading): String = {
t.id
}
override def getValueFromData(t: SensorReading): String = {
t.temperature.toString
}
}
对应着redis的添加命令
package com.lzr.sinktest
import com.lzr.apiTest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisConfigbase, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream: DataStream[String] = env.readTextFile("data/sensor.txt")
// 简单转换
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val fields: Array[String] = data.split(",")
SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
})
val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper))
env.execute("redis sink")
}
}
class MyRedisMapper extends RedisMapper[SensorReading]{
// 写入redis的命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand., "sensor_temp")
}
override def getKeyFromData(t: SensorReading): String = {
t.id
}
override def getValueFromData(t: SensorReading): String = {
t.temperature.toString
}
}



