栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink实现自定义RedisSink保存字节数组类型的数据

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink实现自定义RedisSink保存字节数组类型的数据

Flink 的 Redis 依赖 flink-connector-redis 里默认实现只能保存 String 类型的数据, 但很多时候开发需要保存更多其他类型的数据, 比如保存 ProtoBuf 数据的时候会更多选择将 ProtoBuf 对象转换成字节数组进行保存. 所以这里会简单实现自定义 RedisSink 保存字节数组的代码.

依赖


    org.apache.bahir
    flink-connector-redis_2.11
    1.1-SNAPSHOT

实现时为了方便直接将所有的类都放到同一个java文件

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;


public class SinkRedis {

    public static void main(String[] args) throws Exception {

        // Kafka Source 配置
        KafkaSource source = KafkaSource.builder()
                .setBootstrapServers("127.0.0.1:9092")
                .setTopics("test")
                .setGroupId("SourceKafka")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
                .setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
                .setValueonlyDeserializer(new SimpleStringSchema())
                .build();

        // Redis Sink 配置
        MyRedisSink redisSink = new MyRedisSink<>(
                "JedisPool",
                new MyRedisMapper() {
                    // 保存数据的命令
                    @Override
                    public RedisCommandDescription getCommandDescription() {
                        return new RedisCommandDescription(RedisCommand.SET);
                    }
                    // 键值对的key
                    @Override
                    public byte[] getKeyFromData(String data) {
                        return data.split(" ")[0].getBytes(StandardCharsets.UTF_8);
                    }
                    // 键值对的value
                    @Override
                    public byte[] getValueFromData(String data) {
                        return data.split(" ")[1].getBytes(StandardCharsets.UTF_8);
                    }
                }
        );

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka").addSink(redisSink);

        env.execute();
    }
}


class MyRedisSink extends RichSinkFunction {

    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(MyRedisSink.class);

    private String flag; // 单机还是集群模式, 值是JedisPool或者JedisCluster, 可以自由扩展
    private RedisCommand redisCommand; // Redis命令, 简单实现了set, 可以自由扩展
    private MyRedisMapper redisSinkMapper; // 解析输入数据的逻辑
    private MyRedisCommandsContainer redisCommandsContainer; // 创建Redis客户端容器

    public MyRedisSink(String flag, MyRedisMapper redisSinkMapper) {
        this.flag = flag;
        this.redisSinkMapper = redisSinkMapper;
        this.redisCommand = redisSinkMapper.getCommandDescription().getCommand();
    }

    public void invoke(IN input, Context context) {
        byte[] key = this.redisSinkMapper.getKeyFromData(input);
        byte[] value = this.redisSinkMapper.getValueFromData(input);
        switch (this.redisCommand) {
            case SET:
                this.redisCommandsContainer.set(key, value);
                break;
            default:
                throw new IllegalArgumentException("Cannot process such data type: " + this.redisCommand);
        }
    }

    public void open(Configuration parameters) throws Exception {
        try {
            if (flag == null || (Objects.equals("JedisPool", flag) && Objects.equals("JedisCluster", flag))) {
                throw new RuntimeException("Flag type must be sure");
            }
            this.redisCommandsContainer = MyRedisCommandsContainerBuilder.build(flag);
            this.redisCommandsContainer.open();
        } catch (Exception e) {
            LOG.error("Redis has not been properly initialized: ", e);
            throw e;
        }
    }
    
    public void close() throws IOException {
        if (this.redisCommandsContainer != null) {
            this.redisCommandsContainer.close();
        }
    }
}


interface MyRedisMapper extends Function, Serializable {

    RedisCommandDescription getCommandDescription(); // Redis命令

    byte[] getKeyFromData(T data); // key

    byte[] getValueFromData(T data); // value
}


class MyRedisCommandsContainerBuilder {

    public MyRedisCommandsContainerBuilder() {

    }

    public static MyRedisCommandsContainer build(String flag) {
        if (Objects.equals("JedisPool", flag)) {
            return buildJedisPool();
        } else if (Objects.equals("JedisCluster", flag)) {
            return buildJedisCluster();
        }
        return null;
    }

    // 单机
    private static MyRedisCommandsContainer buildJedisPool() {

        // 连接池配置
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(8); // 资源池中的最大连接数
        config.setMaxIdle(8); // 资源池允许的最大空闲连接数
        config.setMinIdle(0); // 资源池确保的最少空闲连接数
        config.setBlockWhenExhausted(true); // 当资源池用尽后调用者是否要等待,只有当值为true时下面的maxWaitMillis才会生效
        config.setMaxWaitMillis(30000); // 当资源池连接用尽后,调用者的最大等待时间
        config.setTestWhileIdle(true); // 是否开启空闲资源检测
        config.setTimeBetweenEvictionRunsMillis(60000); // 空闲资源的检测周期
        config.setMinEvictableIdleTimeMillis(900000); // 资源池中资源的最小空闲时间,达到此值后空闲资源将被移除
        config.setNumTestsPerEvictionRun(3); // 做空闲资源检测时,每次检测资源的个数

        return new MyRedisContainer(new JedisPool(config, "127.0.0.1", 6379));
    }

    // 集群
    private static MyRedisCommandsContainer buildJedisCluster() {

        // 连接池配置
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(100); // 资源池中的最大连接数
        config.setMaxIdle(50); // 资源池允许的最大空闲连接数
        config.setMinIdle(20); // 资源池确保的最少空闲连接数
        config.setBlockWhenExhausted(true); // 当资源池用尽后调用者是否要等待,只有当值为true时下面的maxWaitMillis才会生效
        config.setMaxWaitMillis(30000); // 当资源池连接用尽后,调用者的最大等待时间
        config.setTestWhileIdle(true); // 是否开启空闲资源检测
        config.setTimeBetweenEvictionRunsMillis(60000); // 空闲资源的检测周期
        config.setMinEvictableIdleTimeMillis(900000); // 资源池中资源的最小空闲时间,达到此值后空闲资源将被移除
        config.setNumTestsPerEvictionRun(5); // 做空闲资源检测时,每次检测资源的个数
        // 集群信息
        Set nodes = new HashSet<>();
        nodes.add(new HostAndPort("192.168.175.130", 7201));
        nodes.add(new HostAndPort("192.168.175.131", 7201));
        nodes.add(new HostAndPort("192.168.175.132", 7201));

        return new MyRedisClusterContainer(new JedisCluster(nodes, config));
    }
}


interface MyRedisCommandsContainer extends Serializable {

    void open() throws Exception; 

    void set(byte[] key, byte[] value);

    void close() throws IOException;
}


class MyRedisContainer implements MyRedisCommandsContainer, Closeable {

    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(MyRedisContainer.class);

    private transient JedisPool jedisPool;

    public MyRedisContainer(JedisPool jedisPool) {
        Objects.requireNonNull(jedisPool, "Jedis Pool can not be null");
        this.jedisPool = jedisPool;
    }

    private Jedis getInstance() {
        return this.jedisPool.getResource();
    }

    @Override
    public void open() {
        this.getInstance().echo("Test");
    }

    @Override
    public void close() {
        if (this.jedisPool != null) {
            this.jedisPool.close();
        }
    }

    @Override
    public void set(byte[] key, byte[] value) {
        Jedis jedis = null;
        try {
            jedis = this.getInstance();
            jedis.set(key, value);
        } catch (Exception e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Cannot send Redis message with command SET to key {} error message {}", key, e.getMessage());
            }
            throw e;
        } finally {
            this.releaseInstance(jedis);
        }
    }

    private void releaseInstance(Jedis jedis) {
        if (jedis != null) {
            try {
                jedis.close();
            } catch (Exception e) {
                LOG.error("Failed to close (return) instance to pool", e);
            }
        }
    }
}


class MyRedisClusterContainer implements MyRedisCommandsContainer, Closeable {

    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(MyRedisClusterContainer.class);

    private transient JedisCluster jedisCluster;

    public MyRedisClusterContainer(JedisCluster jedisCluster) {
        Objects.requireNonNull(jedisCluster, "Jedis cluster can not be null");
        this.jedisCluster = jedisCluster;
    }

    @Override
    public void open() {
        this.jedisCluster.echo("Test");
    }

    @Override
    public void close() throws IOException {
        this.jedisCluster.close();
    }

    @Override
    public void set(byte[] key, byte[] value) {
        try {
            this.jedisCluster.set(key, value);
        } catch (Exception e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Cannot send Redis message with command SET to key {} error message {}", key, e.getMessage());
            }
            throw e;
        }
    }
}

这里只是简单写的demo, 只有单机和集群的Redis实现, 命令实现也只有set, 以及很多硬编码的地方, 还有很多可以优化和改进的地方, 大家可以根据自己实际需求点进源码里面仿照着实现更多功能.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/682514.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号