栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink sink redis 写入Redis

Flink sink redis 写入Redis

1.依赖 


        
            org.apache.flink
            flink-java
            1.10.1
        
        
            org.apache.flink
            flink-streaming-java_2.12
            1.10.1
        
        
            org.apache.flink
            flink-connector-kafka-0.11_2.12
            1.10.1
        
        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
        
        
            org.apache.flink
            flink-connector-elasticsearch6_2.12
            1.10.1
        
        
            mysql
            mysql-connector-java
            5.1.44
        
        
            org.apache.flink
            flink-statebackend-rocksdb_2.12
            1.10.1
        
        
            org.apache.flink
            flink-table-planner_2.12
            1.10.1
        
        
            org.apache.flink
            flink-table-planner-blink_2.12
            1.10.1
        
        
            org.apache.flink
            flink-csv
            1.10.1
        

    

2.实体

// 传感器温度读数的数据类型
public class SensorReading {
    // 属性:id,时间戳,温度值
    private String id;
    private Long timestamp;
    private Double temperature;

    public SensorReading() {
    }

    public SensorReading(String id, Long timestamp, Double temperature) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperature = temperature;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Double getTemperature() {
        return temperature;
    }

    public void setTemperature(Double temperature) {
        this.temperature = temperature;
    }

    @Override
    public String toString() {
        return "SensorReading{" +
                "id='" + id + ''' +
                ", timestamp=" + timestamp +
                ", temperature=" + temperature +
                '}';
    }
}

3.代码

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 
public class SinkTest2_Redis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
        DataStream inputStream = env.readTextFile("D:\sensor.txt");

        // 转换成SensorReading类型
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义jedis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
//            .setPassword("aaa")  设置密码
                .build();

        dataStream.addSink( new RedisSink<>(config, new MyRedisMapper()));

        env.execute();
    }

    // 自定义RedisMapper
    public static class MyRedisMapper implements RedisMapper{
        // 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
        // 127.0.0.1:6379> HGET sensor_temp sensor_1     获取存的数据
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
        }

        @Override
        public String getKeyFromData(SensorReading data) {
            return data.getId();
        }

        @Override
        public String getValueFromData(SensorReading data) {
            return data.getTemperature().toString();
        }
    }
}

4.

D:\sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583740.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号