-
序列化可以简单理解为对象-->字节的过程,同理,反序列化则是相反的过程。为什么需要序列化?因为网络传输只认字节。所以互信的过程依赖于序列化。
-
网络传输的性能等诸多因素,通常会支持多种序列化方式以供使用者插拔使用,一些常用的序列化方案 hessian,kryo,Protostuff、FST 等,其中最快、效果最好的要数 Kryo 和 Protostuff
RedisConfiguration 的配置
-
创建 Redis 连接工厂对象(RedisConnectionFactory)
-
创建 RestTemplate 对象根据 RedisConnectionFactory 对象。
-
配置相关的 RedisSerializaer 组件
@Configurationpublic class RedisConfiguration {
@Bean("redisConnectionFactory") public RedisConnectionFactory redisConnectionFactory(RedisConfigMapper mapper) { List redisConfigs = mapper.getRedisConfig(); List clusterNodes = new ArrayList<>(); for (RedisConfig rc : redisConfigs) { clusterNodes.add(rc.getUrl() + ":" + rc.getPort()); } // 获取Redis集群配置信息 RedisClusterConfiguration rcf = new RedisClusterConfiguration(clusterNodes); return new JedisConnectionFactory(rcf); }
@Bean("redisTemplate") public RedisTemplate
Kryo 序列化实现
Maven 配置文件
org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-autoconfigureorg.springframework.boot spring-boot-starter-data-rediscom.esotericsoftware kryo4.0.1 de.javakaffee kryo-serializers0.41 com.esotericsoftware kryo-shaded4.0.1
由于其底层依赖于 ASM 技术,与 Spring 等框架可能会发生 ASM 依赖的版本冲突(文档中表示这个冲突还挺容易出现)所以提供了另外一个依赖以供解决此问题:kryo-shaded
Kryo 三种读写方式
如果知道 class 字节码,并且对象不为空
kryo.writeObject(output, classObject); RestClass restClass = kryo.readObject(input, RestClass.class);
快速入门中的序列化/反序列化的方式便是这一种。而 Kryo 考虑到 someObject 可能为 null,也会导致返回的结果为 null,所以提供了第二套读写方式。
如果知道 class 字节码,并且对象可能为空
kryo.writeObjectOrNull(output, classObject);RestClass someObject = kryo.readObjectOrNull(input, RestClass.class);
但这两种方法似乎都不能满足我们的需求,在 RPC 调用中,序列化和反序列化分布在不同的端点,对象的类型确定,我们不想依赖于手动指定参数,最好将字节码的信息直接存放到序列化结果中,在反序列化时自行读取字节码信息。Kryo 考虑到了这一点,于是提供了第三种方式。如果实现类的字节码未知,并且对象可能为 null。
kryo.writeClassAndObject(output, object);Object object = kryo.readClassAndObject(input);if (object instanceof RestClass) {}
我们牺牲了一些空间一些性能去存放字节码信息
支持的序列化类型
上面表格中支持的类型一览无余,这都是其默认支持的。
Kryo kryo = new Kryo();kryo.addDefaultSerializer(RestClass.class, RestSerializer.class);
这样的方式,也可以为一个 Kryo 实例扩展序列化器
Kryo 支持类型:
-
枚举
-
集合、数组
-
子类/多态
-
循环引用
-
内部类
-
泛型
Kryo 反序列化的异常问题
-
Kryo 不支持 Bean 中增删字段,如果使用 Kryo 序列化了一个类,存入了 Redis,对类进行了修改,会导致反序列化的异常。
-
另外需要注意的一点是使用反射创建的一些类序列化的支持。如使用 Arrays.asList();创建的 List 对象,会引起序列化异常。
-
不支持包含无参构造器类的反序列化,尝试反序列化一个不包含无参构造器的类将会得到以下的异常:
-
保证每个类具有无参构造器是应当遵守的编程规范,但实际开发中一些第三库的相关类不包含无参构造,的确是有点麻烦。
Kryo 是线程不安全的
借助 ThreadLocal 来维护以保证其线程安全。
private static final ThreadLocalkryos = new ThreadLocal () { protected Kryo initialValue() { Kryo kryo = new Kryo(); // configure kryo instance, customize settings return kryo; };}; // Somewhere else, use KryoKryo k = kryos.get();...
Kryo 相关配置参数
每个 Kryo 实例都可以拥有两个配置参数。
-
kryo.setRegistrationRequired(false);//关闭注册行为
Kryo 支持对注册行为,如 kryo.register(SomeClazz.class),这会赋予该 Class 一个从 0 开始的编号,但 Kryo 使用注册行为最大的问题在于,其不保证同一个 Class 每一次注册的号码相同,这与注册的顺序有关,也就意味着在不同的机器、同一个机器重启前后都有可能拥有不同的编号,这会导致序列化产生问题,所以在分布式项目中,一般关闭注册行为。
-
kryo.setReferences(true);//支持循环引用
循环引用,Kryo 为了追求高性能,可以关闭循环引用的支持。不过我并不认为关闭它是一件好的选择,大多数情况下,请保持 kryo.setReferences(true)。
常用 Kryo 工具类
public class KryoSerializer {
public byte[] serialize(Object obj) { Kryo kryo = kryoLocal.get(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Output output = new Output(byteArrayOutputStream);//<1> kryo.writeClassAndObject(output, obj);//<2> output.close(); return byteArrayOutputStream.toByteArray(); }
public T deserialize(byte[] bytes) { Kryo kryo = kryoLocal.get(); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); Input input = new Input(byteArrayInputStream);// <1> input.close(); return (T) kryo.readClassAndObject(input);//<2> }
private static final ThreadLocal kryoLocal = new ThreadLocal() {//<3> @Override protected Kryo initialValue() { Kryo kryo = new Kryo(); kryo.setReferences(true);//默认值为true,强调作用 kryo.setRegistrationRequired(false);//默认值为false,强调作用 return kryo; } };}
-
Kryo 的 Input 和 Output 接收一个 InputStream 和 OutputStream,Kryo 通常完成字节数组和对象的转换,所以常用的输入输出流实现为 ByteArrayInputStream/ByteArrayOutputStream。
-
writeClassAndObject 和 readClassAndObject 配对使用在分布式场景下是最常见的,序列化时将字节码存入序列化结果中,便可以在反序列化时不必要传入字节码信息。
-
使用 ThreadLocal 维护 Kryo 实例,这样减少了每次使用都实例化一次 Kryo 的开销又可以保证其线程安全。
KryoRedisSerializer
数据交换或数据持久化,比如使用 kryo 把对象序列化成字节数组发送给消息队列或者放到 redis 等等应用场景。
public class KryoRedisSerializerprotostuff 序列化实现implements RedisSerializer { private static final String DEFAULT_ENCODING = "UTF-8"; //每个线程的 Kryo 实例 private static final ThreadLocal kryoLocal = new ThreadLocal () { @Override protected Kryo initialValue() { Kryo kryo = new Kryo(); //支持对象循环引用(否则会栈溢出) kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置 //不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册) kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置 //Fix the NPE bug when deserializing Collections. ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); return kryo; } }; public static Kryo getInstance() { return kryoLocal.get(); } @Override public byte[] serialize(T t) throws SerializationException { byte[] buffer = new byte[2048]; Output output = new Output(buffer); getInstance().writeClassAndObject(output, t); return output.toBytes(); } @Override public T deserialize(byte[] bytes) throws SerializationException { Input input = new Input(bytes); @SuppressWarnings("unchecked") T t = (T) getInstance().readClassAndObject(input); return t; } }
Maven 配置文件
定义一个 ProtoStuffUtil 工具类com.dyuproject.protostuff protostuff-core1.1.3 com.dyuproject.protostuff protostuff-runtime1.1.3
@Slf4jpublic class ProtoStuffUtil { public static byte[] serialize(T obj) { if (obj == null) { log.error("Failed to serializer, obj is null"); throw new RuntimeException("Failed to serializer"); } @SuppressWarnings("unchecked") Schema schema = (Schema) RuntimeSchema.getSchema(obj.getClass()); linkedBuffer buffer = linkedBuffer.allocate(1024 * 1024); byte[] protoStuff; try { protoStuff = ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { log.error("Failed to serializer, obj:{}", obj, e); throw new RuntimeException("Failed to serializer"); } finally { buffer.clear(); } return protoStuff; } public static T deserialize(byte[] paramArrayOfByte, Class targetClass) { if (paramArrayOfByte == null || paramArrayOfByte.length == 0) { log.error("Failed to deserialize, byte is empty"); throw new RuntimeException("Failed to deserialize"); } T instance; try { instance = targetClass.newInstance(); } catch (InstantiationException | IllegalAccessException e) { log.error("Failed to deserialize", e); throw new RuntimeException("Failed to deserialize"); } Schema schema = RuntimeSchema.getSchema(targetClass); ProtostuffIOUtil.mergeFrom(paramArrayOfByte, instance, schema); return instance; } public static byte[] serializeList(List objList) { if (objList == null || objList.isEmpty()) { log.error("Failed to serializer, objList is empty"); throw new RuntimeException("Failed to serializer"); } @SuppressWarnings("unchecked") Schema schema = (Schema) RuntimeSchema.getSchema(objList.get(0).getClass()); linkedBuffer buffer = linkedBuffer.allocate(1024 * 1024); byte[] protoStuff; ByteArrayOutputStream bos = null; try { bos = new ByteArrayOutputStream(); ProtostuffIOUtil.writeListTo(bos, objList, schema, buffer); protoStuff = bos.toByteArray(); } catch (Exception e) { log.error("Failed to serializer, obj list:{}", objList, e); throw new RuntimeException("Failed to serializer"); } finally { buffer.clear(); try { if (bos != null) { bos.close(); } } catch (IOException e) { e.printStackTrace(); } } return protoStuff; } public static List deserializeList(byte[] paramArrayOfByte, Class targetClass) { if (paramArrayOfByte == null || paramArrayOfByte.length == 0) { log.error("Failed to deserialize, byte is empty"); throw new RuntimeException("Failed to deserialize"); } Schema schema = RuntimeSchema.getSchema(targetClass); List result; try { result = ProtostuffIOUtil.parseListFrom(new ByteArrayInputStream(paramArrayOfByte), schema); } catch (IOException e) { log.error("Failed to deserialize", e); throw new RuntimeException("Failed to deserialize"); } return result; } }
不修改 RedisSerializer 组件和重写操作
直接自定义 RedisClient 工具类方式,代理 RedisTemplate 的工具类方法
@Componentpublic class RedisClient {
private final RedisTemplate redisTemplate; @Autowired public RedisClient(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } public T get(final String field, Class targetClass) { byte[] result = redisTemplate.execute((RedisCallback) connection -> connection.get(field.getBytes())); if (result == null) { return null; return ProtoStuffUtil.deserialize(result, targetClass); } public void set(String field, T obj) { final byte[] value = ProtoStuffUtil.serialize(obj); redisTemplate.execute((RedisCallback) connection -> { connection.set(field.getBytes(), value); return null; }); } public void setWithExpire(String field, T obj, final long expireTime) { final byte[] value = ProtoStuffUtil.serialize(obj); redisTemplate.execute((RedisCallback) connection -> { connection.setEx(field.getBytes(), expireTime, value); return null; }); } public List getList(final String field, Class targetClass) { byte[] result = redisTemplate.execute((RedisCallback) connection -> connection.get(field.getBytes())); if (result == null) { return null; } return ProtoStuffUtil.deserializeList(result, targetClass); } public void setList(String field, List objList) { final byte[] value = ProtoStuffUtil.serializeList(objList); redisTemplate.execute((RedisCallback) connection -> { connection.set(field.getBytes(), value); return null; }); } public void setListWithExpire(String field, List objList, final long expireTime) { final byte[] value = ProtoStuffUtil.serializeList(objList); redisTemplate.execute((RedisCallback) connection -> { connection.setEx(field.getBytes(), expireTime, value); return null; }); } public T hGet(final String key, final String field, Class targetClass) { byte[] result = redisTemplate .execute((RedisCallback) connection -> connection.hGet(key.getBytes(), field.getBytes())); if (result == null) { return null; } return ProtoStuffUtil.deserialize(result, targetClass); } public boolean hSet(String key, String field, T obj) { final byte[] value = ProtoStuffUtil.serialize(obj); return redisTemplate.execute( (RedisCallback) connection -> connection.hSet(key.getBytes(), field.getBytes(), value)); } public void hSetWithExpire(String key, String field, T obj, long expireTime) { final byte[] value = ProtoStuffUtil.serialize(obj); redisTemplate.execute((RedisCallback) connection -> { connection.hSet(key.getBytes(), field.getBytes(), value); connection.expire(key.getBytes(), expireTime); return null; }); } public List hGetList(final String key, final String field, Class targetClass) { byte[] result = redisTemplate .execute((RedisCallback) connection -> connection.hGet(key.getBytes(), field.getBytes())); if (result == null) { return null; } return ProtoStuffUtil.deserializeList(result, targetClass); } public boolean hSetList(String key, String field, List objList) { final byte[] value = ProtoStuffUtil.serializeList(objList); return redisTemplate.execute( (RedisCallback) connection -> connection.hSet(key.getBytes(), field.getBytes(), value)); } public Map hMGet(String key, Collection fields, Class targetClass) { List byteFields = fields.stream().map(String::getBytes).collect(Collectors.toList()); byte[][] queryFields = new byte[byteFields.size()][]; byteFields.toArray(queryFields); List cache = redisTemplate .execute((RedisCallback>) connection -> connection.hMGet(key.getBytes(), queryFields)); Map results = new HashMap<>(16); Iterator it = fields.iterator(); int index = 0; while (it.hasNext()) { String k = it.next(); if (cache.get(index) == null) { index++; continue; } results.put(k, ProtoStuffUtil.deserialize(cache.get(index), targetClass)); index++; } return results; } public void hMSet(String field, Map values) { Map bytevalues = new HashMap<>(16); for (Map.Entry value : values.entrySet()) { bytevalues.put(value.getKey().getBytes(), ProtoStuffUtil.serialize(value.getValue())); } redisTemplate.execute((RedisCallback) connection -> { connection.hMSet(field.getBytes(), bytevalues); return null; }); } public Map hGetAll(String key, Class targetClass) { Map records = redisTemplate .execute((RedisCallback



