栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Spring 技术特性】采用 protostuff 和 kryo 高性能序列化框架实现 RestTemplate 的序列化组件

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Spring 技术特性】采用 protostuff 和 kryo 高性能序列化框架实现 RestTemplate 的序列化组件

序列化
  • 序列化可以简单理解为对象-->字节的过程,同理,反序列化则是相反的过程。为什么需要序列化?因为网络传输只认字节。所以互信的过程依赖于序列化。

  • 网络传输的性能等诸多因素,通常会支持多种序列化方式以供使用者插拔使用,一些常用的序列化方案 hessian,kryo,Protostuff、FST 等,其中最快、效果最好的要数 Kryo 和 Protostuff

RedisConfiguration 的配置

  1. 创建 Redis 连接工厂对象(RedisConnectionFactory)

  2. 创建 RestTemplate 对象根据 RedisConnectionFactory 对象。

  3. 配置相关的 RedisSerializaer 组件

@Configurationpublic class RedisConfiguration {
    @Bean("redisConnectionFactory")    public RedisConnectionFactory redisConnectionFactory(RedisConfigMapper mapper) {        List redisConfigs = mapper.getRedisConfig();        List clusterNodes = new ArrayList<>();        for (RedisConfig rc : redisConfigs) {            clusterNodes.add(rc.getUrl() + ":" + rc.getPort());        }        // 获取Redis集群配置信息        RedisClusterConfiguration rcf = new RedisClusterConfiguration(clusterNodes);        return new JedisConnectionFactory(rcf);    }
    @Bean("redisTemplate")    public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {        RedisTemplate template = new RedisTemplate<>();        template.setConnectionFactory(redisConnectionFactory);        // redis value使用的序列化器        template.setValueSerializer(new XXXRedisSerializer<>());        // redis key使用的序列化器        template.setKeySerializer(new XXXRedisSerializer<>());        template.setHashKeySerializer(new XXXRedisSerializer<>());        template.setHashValueSerializer(new XXXRedisSerializer<>());        template.afterPropertiesSet();        return template;    }}
Kryo 序列化实现

Maven 配置文件

                    org.springframework.boot            spring-boot-starter                            org.springframework.boot            spring-boot-autoconfigure                            org.springframework.boot            spring-boot-starter-data-redis                            com.esotericsoftware            kryo            4.0.1                            de.javakaffee            kryo-serializers            0.41                      com.esotericsoftware          kryo-shaded          4.0.1          

由于其底层依赖于 ASM 技术,与 Spring 等框架可能会发生 ASM 依赖的版本冲突(文档中表示这个冲突还挺容易出现)所以提供了另外一个依赖以供解决此问题:kryo-shaded

Kryo 三种读写方式

如果知道 class 字节码,并且对象不为空

  kryo.writeObject(output, classObject);  RestClass restClass = kryo.readObject(input, RestClass.class);

快速入门中的序列化/反序列化的方式便是这一种。而 Kryo 考虑到 someObject 可能为 null,也会导致返回的结果为 null,所以提供了第二套读写方式。

如果知道 class 字节码,并且对象可能为空

kryo.writeObjectOrNull(output, classObject);RestClass someObject = kryo.readObjectOrNull(input, RestClass.class);

但这两种方法似乎都不能满足我们的需求,在 RPC 调用中,序列化和反序列化分布在不同的端点,对象的类型确定,我们不想依赖于手动指定参数,最好将字节码的信息直接存放到序列化结果中,在反序列化时自行读取字节码信息。Kryo 考虑到了这一点,于是提供了第三种方式。如果实现类的字节码未知,并且对象可能为 null。

  kryo.writeClassAndObject(output, object);Object object = kryo.readClassAndObject(input);if (object instanceof RestClass) {}

我们牺牲了一些空间一些性能去存放字节码信息

支持的序列化类型

上面表格中支持的类型一览无余,这都是其默认支持的。

Kryo kryo = new Kryo();kryo.addDefaultSerializer(RestClass.class, RestSerializer.class);

这样的方式,也可以为一个 Kryo 实例扩展序列化器

Kryo 支持类型:

  • 枚举

  • 集合、数组

  • 子类/多态

  • 循环引用

  • 内部类

  • 泛型

Kryo 反序列化的异常问题

  • Kryo 不支持 Bean 中增删字段,如果使用 Kryo 序列化了一个类,存入了 Redis,对类进行了修改,会导致反序列化的异常。

  • 另外需要注意的一点是使用反射创建的一些类序列化的支持。如使用 Arrays.asList();创建的 List 对象,会引起序列化异常。

  • 不支持包含无参构造器类的反序列化,尝试反序列化一个不包含无参构造器的类将会得到以下的异常:

  • 保证每个类具有无参构造器是应当遵守的编程规范,但实际开发中一些第三库的相关类不包含无参构造,的确是有点麻烦。

Kryo 是线程不安全的

借助 ThreadLocal 来维护以保证其线程安全。

private static final ThreadLocal kryos = new ThreadLocal() {    protected Kryo initialValue() {        Kryo kryo = new Kryo();        // configure kryo instance, customize settings        return kryo;    };};
// Somewhere else, use KryoKryo k = kryos.get();...

Kryo 相关配置参数

每个 Kryo 实例都可以拥有两个配置参数。

  • kryo.setRegistrationRequired(false);//关闭注册行为

Kryo 支持对注册行为,如 kryo.register(SomeClazz.class),这会赋予该 Class 一个从 0 开始的编号,但 Kryo 使用注册行为最大的问题在于,其不保证同一个 Class 每一次注册的号码相同,这与注册的顺序有关,也就意味着在不同的机器、同一个机器重启前后都有可能拥有不同的编号,这会导致序列化产生问题,所以在分布式项目中,一般关闭注册行为。

  • kryo.setReferences(true);//支持循环引用

循环引用,Kryo 为了追求高性能,可以关闭循环引用的支持。不过我并不认为关闭它是一件好的选择,大多数情况下,请保持 kryo.setReferences(true)。

常用 Kryo 工具类

public class KryoSerializer {
    public byte[] serialize(Object obj) {        Kryo kryo = kryoLocal.get();        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();        Output output = new Output(byteArrayOutputStream);//<1>        kryo.writeClassAndObject(output, obj);//<2>        output.close();        return byteArrayOutputStream.toByteArray();    }
    public  T deserialize(byte[] bytes) {        Kryo kryo = kryoLocal.get();        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);        Input input = new Input(byteArrayInputStream);// <1>        input.close();        return (T) kryo.readClassAndObject(input);//<2>    }
    private static final ThreadLocal kryoLocal = new ThreadLocal() {//<3>        @Override        protected Kryo initialValue() {            Kryo kryo = new Kryo();            kryo.setReferences(true);//默认值为true,强调作用            kryo.setRegistrationRequired(false);//默认值为false,强调作用            return kryo;        }    };}
  1. Kryo 的 Input 和 Output 接收一个 InputStream 和 OutputStream,Kryo 通常完成字节数组和对象的转换,所以常用的输入输出流实现为 ByteArrayInputStream/ByteArrayOutputStream。

  2. writeClassAndObject 和 readClassAndObject 配对使用在分布式场景下是最常见的,序列化时将字节码存入序列化结果中,便可以在反序列化时不必要传入字节码信息。

  3. 使用 ThreadLocal 维护 Kryo 实例,这样减少了每次使用都实例化一次 Kryo 的开销又可以保证其线程安全。

KryoRedisSerializer

数据交换或数据持久化,比如使用 kryo 把对象序列化成字节数组发送给消息队列或者放到 redis 等等应用场景。

public class KryoRedisSerializer implements RedisSerializer {
    private static final String DEFAULT_ENCODING = "UTF-8";     //每个线程的 Kryo 实例    private static final ThreadLocal kryoLocal = new ThreadLocal() {        @Override        protected Kryo initialValue() {            Kryo kryo = new Kryo();                         //支持对象循环引用(否则会栈溢出)            kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置             //不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)            kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置             //Fix the NPE bug when deserializing Collections.            ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())                    .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());             return kryo;        }    };         public static Kryo getInstance() {        return kryoLocal.get();    }
     @Override    public byte[] serialize(T t) throws SerializationException {        byte[] buffer = new byte[2048];        Output output = new Output(buffer);        getInstance().writeClassAndObject(output, t);        return output.toBytes();    }
    @Override    public T deserialize(byte[] bytes) throws SerializationException {        Input input = new Input(bytes);        @SuppressWarnings("unchecked")        T t = (T) getInstance().readClassAndObject(input);        return t;    }
}
protostuff 序列化实现

Maven 配置文件

    com.dyuproject.protostuff    protostuff-core    1.1.3    com.dyuproject.protostuff    protostuff-runtime    1.1.3
定义一个 ProtoStuffUtil 工具类
@Slf4jpublic class ProtoStuffUtil {        public static  byte[] serialize(T obj) {        if (obj == null) {            log.error("Failed to serializer, obj is null");            throw new RuntimeException("Failed to serializer");        }         @SuppressWarnings("unchecked") Schema schema = (Schema) RuntimeSchema.getSchema(obj.getClass());        linkedBuffer buffer = linkedBuffer.allocate(1024 * 1024);        byte[] protoStuff;        try {            protoStuff = ProtostuffIOUtil.toByteArray(obj, schema, buffer);        } catch (Exception e) {            log.error("Failed to serializer, obj:{}", obj, e);            throw new RuntimeException("Failed to serializer");        } finally {            buffer.clear();        }        return protoStuff;    }         public static  T deserialize(byte[] paramArrayOfByte, Class targetClass) {        if (paramArrayOfByte == null || paramArrayOfByte.length == 0) {            log.error("Failed to deserialize, byte is empty");            throw new RuntimeException("Failed to deserialize");        }         T instance;        try {            instance = targetClass.newInstance();        } catch (InstantiationException | IllegalAccessException e) {            log.error("Failed to deserialize", e);            throw new RuntimeException("Failed to deserialize");        }         Schema schema = RuntimeSchema.getSchema(targetClass);        ProtostuffIOUtil.mergeFrom(paramArrayOfByte, instance, schema);        return instance;    }         public static  byte[] serializeList(List objList) {        if (objList == null || objList.isEmpty()) {            log.error("Failed to serializer, objList is empty");            throw new RuntimeException("Failed to serializer");        }         @SuppressWarnings("unchecked") Schema schema =                (Schema) RuntimeSchema.getSchema(objList.get(0).getClass());        linkedBuffer buffer = linkedBuffer.allocate(1024 * 1024);        byte[] protoStuff;        ByteArrayOutputStream bos = null;        try {            bos = new ByteArrayOutputStream();            ProtostuffIOUtil.writeListTo(bos, objList, schema, buffer);            protoStuff = bos.toByteArray();        } catch (Exception e) {            log.error("Failed to serializer, obj list:{}", objList, e);            throw new RuntimeException("Failed to serializer");        } finally {            buffer.clear();            try {                if (bos != null) {                    bos.close();                }            } catch (IOException e) {                e.printStackTrace();            }        }         return protoStuff;    }         public static  List deserializeList(byte[] paramArrayOfByte, Class targetClass) {        if (paramArrayOfByte == null || paramArrayOfByte.length == 0) {            log.error("Failed to deserialize, byte is empty");            throw new RuntimeException("Failed to deserialize");        }         Schema schema = RuntimeSchema.getSchema(targetClass);        List result;        try {            result = ProtostuffIOUtil.parseListFrom(new ByteArrayInputStream(paramArrayOfByte), schema);        } catch (IOException e) {            log.error("Failed to deserialize", e);            throw new RuntimeException("Failed to deserialize");        }        return result;    } }

不修改 RedisSerializer 组件和重写操作

直接自定义 RedisClient 工具类方式,代理 RedisTemplate 的工具类方法

@Componentpublic class RedisClient {
    private final RedisTemplate redisTemplate;     @Autowired    public RedisClient(RedisTemplate redisTemplate) {        this.redisTemplate = redisTemplate;    }         public  T get(final String field, Class targetClass) {        byte[] result = redisTemplate.execute((RedisCallback) connection -> connection.get(field.getBytes()));        if (result == null) {            return null;        return ProtoStuffUtil.deserialize(result, targetClass);    }         public  void set(String field, T obj) {        final byte[] value = ProtoStuffUtil.serialize(obj);        redisTemplate.execute((RedisCallback) connection -> {            connection.set(field.getBytes(), value);            return null;        });    }         public  void setWithExpire(String field, T obj, final long expireTime) {        final byte[] value = ProtoStuffUtil.serialize(obj);        redisTemplate.execute((RedisCallback) connection -> {            connection.setEx(field.getBytes(), expireTime, value);            return null;        });    }         public  List getList(final String field, Class targetClass) {        byte[] result = redisTemplate.execute((RedisCallback) connection -> connection.get(field.getBytes()));        if (result == null) {            return null;        }         return ProtoStuffUtil.deserializeList(result, targetClass);    }         public  void setList(String field, List objList) {        final byte[] value = ProtoStuffUtil.serializeList(objList);        redisTemplate.execute((RedisCallback) connection -> {            connection.set(field.getBytes(), value);            return null;        });    }         public  void setListWithExpire(String field, List objList, final long expireTime) {        final byte[] value = ProtoStuffUtil.serializeList(objList);        redisTemplate.execute((RedisCallback) connection -> {            connection.setEx(field.getBytes(), expireTime, value);            return null;        });    }         public  T hGet(final String key, final String field, Class targetClass) {        byte[] result = redisTemplate                .execute((RedisCallback) connection -> connection.hGet(key.getBytes(), field.getBytes()));        if (result == null) {            return null;        }         return ProtoStuffUtil.deserialize(result, targetClass);    }         public  boolean hSet(String key, String field, T obj) {        final byte[] value = ProtoStuffUtil.serialize(obj);        return redisTemplate.execute(                (RedisCallback) connection -> connection.hSet(key.getBytes(), field.getBytes(), value));    }         public  void hSetWithExpire(String key, String field, T obj, long expireTime) {        final byte[] value = ProtoStuffUtil.serialize(obj);        redisTemplate.execute((RedisCallback) connection -> {            connection.hSet(key.getBytes(), field.getBytes(), value);            connection.expire(key.getBytes(), expireTime);            return null;        });    }         public  List hGetList(final String key, final String field, Class targetClass) {        byte[] result = redisTemplate                .execute((RedisCallback) connection -> connection.hGet(key.getBytes(), field.getBytes()));        if (result == null) {            return null;        }         return ProtoStuffUtil.deserializeList(result, targetClass);    }         public  boolean hSetList(String key, String field, List objList) {        final byte[] value = ProtoStuffUtil.serializeList(objList);        return redisTemplate.execute(                (RedisCallback) connection -> connection.hSet(key.getBytes(), field.getBytes(), value));    }         public  Map hMGet(String key, Collection fields, Class targetClass) {        List byteFields = fields.stream().map(String::getBytes).collect(Collectors.toList());        byte[][] queryFields = new byte[byteFields.size()][];        byteFields.toArray(queryFields);        List cache = redisTemplate                .execute((RedisCallback>) connection -> connection.hMGet(key.getBytes(), queryFields));         Map results = new HashMap<>(16);        Iterator it = fields.iterator();        int index = 0;        while (it.hasNext()) {            String k = it.next();            if (cache.get(index) == null) {                index++;                continue;            }             results.put(k, ProtoStuffUtil.deserialize(cache.get(index), targetClass));            index++;        }         return results;    }         public  void hMSet(String field, Map values) {        Map bytevalues = new HashMap<>(16);        for (Map.Entry value : values.entrySet()) {            bytevalues.put(value.getKey().getBytes(), ProtoStuffUtil.serialize(value.getValue()));        }         redisTemplate.execute((RedisCallback) connection -> {            connection.hMSet(field.getBytes(), bytevalues);            return null;        });    }         public  Map hGetAll(String key, Class targetClass) {        Map records = redisTemplate                .execute((RedisCallback>) connection -> connection.hGetAll(key.getBytes()));         Map ret = new HashMap<>(16);        for (Map.Entry record : records.entrySet()) {            T obj = ProtoStuffUtil.deserialize(record.getValue(), targetClass);            ret.put(new String(record.getKey()), obj);        }         return ret;    }         public  T lIndex(String key, int index, Class targetClass) {        byte[] value =                redisTemplate.execute((RedisCallback) connection -> connection.lIndex(key.getBytes(), index));        return ProtoStuffUtil.deserialize(value, targetClass);    }         public  List lRange(String key, int start, int end, Class targetClass) {        List value = redisTemplate                .execute((RedisCallback>) connection -> connection.lRange(key.getBytes(), start, end));        return value.stream().map(record -> ProtoStuffUtil.deserialize(record, targetClass))                .collect(Collectors.toList());    }         public  void lPush(String key, T obj) {        final byte[] value = ProtoStuffUtil.serialize(obj);        redisTemplate.execute((RedisCallback) connection -> connection.lPush(key.getBytes(), value));    }         public  void lPush(String key, List objList) {        List byteFields = objList.stream().map(ProtoStuffUtil::serialize).collect(Collectors.toList());        byte[][] values = new byte[byteFields.size()][];         redisTemplate.execute((RedisCallback) connection -> connection.lPush(key.getBytes(), values));    }         public void deleteCache(String key) {        redisTemplate.delete(key);    }        public void zAdd(String redisKey, ImmutablePair immutablePair) {        final byte[] key = redisKey.getBytes();        final byte[] value = immutablePair.getLeft().getBytes();        redisTemplate.execute((RedisCallback) connection -> connection                .zAdd(key, immutablePair.getRight().doublevalue(), value));     }         public List> zRangeWithScores(String redisKey, int start, int end) {        Set items = redisTemplate.execute(                (RedisCallback>) connection -> connection                        .zRangeWithScores(redisKey.getBytes(), start, end));        return items.stream()                .map(record -> ImmutablePair.of(new String(record.getValue()), BigDecimal.valueOf(record.getScore())))                .collect(Collectors.toList());    }        public List> zRevRangeWithScores(String redisKey, int start, int end) {        Set items = redisTemplate.execute(                (RedisCallback>) connection -> connection                        .zRevRangeWithScores(redisKey.getBytes(), start, end));        return items.stream()                .map(record -> ImmutablePair.of(new String(record.getValue()), BigDecimal.valueOf(record.getScore())))                .collect(Collectors.toList());    }}
小编这里还有一篇《细节理解!阿里内部 Java 高并发系统设计全彩手册曝光!霸榜 GitHub》 感兴趣的可以点击观看哦
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/340846.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号