栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

自定义kafka高效的protoStuff序列化

自定义kafka高效的protoStuff序列化

目前序列化领域中,谷歌的 protobuf 应该是性能好,效率高的了,并且 protobuf 支持多种语言,可跨平台,跨语言

但使用起来并不像其他序列化那么简单(首先要写.proto文件,然后编译.proto文件,生成对应的.java文件)

protostuff 基于Google protobuf,但是提供了更多的功能和更简易的用法。其中,protostuff-runtime 实现了无需预编译对java bean进行protobuf序列化/反序列化的能力。protostuff-runtime的局限是序列化前需预先传入schema,反序列化不负责对象的创建只负责复制,因而必须提供默认构造函数。此外,protostuff 还可以按照protobuf的配置序列化成json/yaml/xml等格式。

在性能上,protostuff不输原生的protobuf,甚至有反超之势。但 protostuff应该只支持 java端,目前没看到可支持其他端

---------------------------------------------------------------------------------------------------------------------------------

自定义kafka高效的protoStuff序列化

最核心的点是Schema,思想都是共通的,像mysql、plusar、prestod等,都会有类似的Schema的概念,数据的结构化,protostuff 使用起来比 protobuf简单了很多,核心代码如下:

1,自定义序列化器

public class ProtostuffSerializer  implements Serializer {

    private static final linkedBuffer BUFFER = linkedBuffer.allocate(linkedBuffer.MIN_BUFFER_SIZE);

    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    @SuppressWarnings("unchecked")
    public byte[] serialize(String topic, T data) {
        Class dataClass = (Class) data.getClass();
        Schema schema = RuntimeSchema.getSchema(dataClass);
        byte[] byteData;
        try {
            byteData = ProtostuffIOUtil.toByteArray(data, schema, BUFFER);
        } finally {
            BUFFER.clear();
        }
        return byteData;
    }

    @Override
    public void close() {

    }
}

从protoStuff的的源码可以看到,内部会将我们传递进去的dataClass,以一个

ConcurrentHashMap> pojoMapping,保存在内存,所有直接调用RuntimeSchema.getSchema 就可以了,不需要外部在自己缓存这些schema

 

2,反序列化

@KafkaListener(topics = {"protostuff_topic"}, groupId = "group1", containerFactory="kafkaListenerContainerFactory")
    public void test(List> records) {
        System.out.println("拉取消息量: " + records.size());
        for (ConsumerRecord consumerRecord : records){
            System.out.println("消费消息:partition " + consumerRecord.partition() + " value " + consumerRecord.value() );

            Schema schema = RuntimeSchema.getSchema(User.class);
            User user = schema.newMessage();
            ProtostuffIOUtil.mergeFrom(data, user, schema);
            System.out.println(user);
        }
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758480.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号