flink-connector-redis
查询Flink连接器,最简单的就是查询关键字flink-connector-
这里将Redis当作sink的输出对象。
1. pom依赖
org.apache.bahir
flink-connector-redis_2.11
1.0
编写代码
package com.zch.apitest.sink;
import com.zch.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkTest2_Redis {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取文件
DataStream inputStream = env.readTextFile("F:\JAVA\bigdata2107\zch\flink\src\main\resources\Sensor.txt");
SingleOutputStreamOperator dataStream = inputStream.map(lines -> {
String[] split = lines.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
// 定义jedis连接配置
FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig
.Builder()
.setHost("192.168.235.10")
.setPort(6379)
.build();
dataStream.addSink(new RedisSink<>(flinkJedisPoolConfig,new MyRedisMapper()));
env.execute();
}
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper{
// 定义保存数据到redis的命令,存成hash表,hset sensor_temp id temperature
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
}
@Override
public String getKeyFromData(SensorReading sensorReading) {
return sensorReading.getId();
}
@Override
public String getValueFromData(SensorReading sensorReading) {
return sensorReading.getTemperature().toString();
}
}
}
启动redis服务(我这里是docker里的)
启动Flink程序
查看Redis里的数据
因为最新数据覆盖前面的,所以最后redis里呈现的是最新的数据。
localhost:6379>hgetall sensor_temp 1) "sensor_1" 2) "37.1" 3) "sensor_6" 4) "15.4" 5) "sensor_7" 6) "6.7" 7) "sensor_10" 8) "38.1"



