栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

实现基于Netty的RPC框架

实现基于Netty的RPC框架

距离写的代码的时间比较久了,在这里只是简单的总结介绍一下。
简单实现了基于Netty的RPC框架并将其注册到Nacos,介绍内容包含序列化,自定义协议,负载均衡算法,Nacos相关服务。
文末有源码链接。
著名的分布式服务框架Dubbo使用Dubbo协议进行节点间通信,而Dubbo协议默认使用Netty作为基础通信组件。还有Zookeeper,RocketMQ等底层rpc通讯也使用的是Netty。因此学习Netty对掌握这些框架原理还是比不可少的。
序列化:
为实现传输数据和通信,序列化是比不可少的,选择合适的序列化算法是非常重要的,通常要选择性能高,生成字节数少的算法。这里我准备了以下实现:JDK自带的,Json ,还有一个貌似当初因为时间关系没实现的Protobuf,以后有时间补上。

    Java{
        @Override
        public  T deserialize(Class clazz, byte[] bytes) {
            try {
                ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                return (T) ois.readObject();
            } catch (IOException | ClassNotFoundException e) {
                log.error("Failed to serialize deserialize java");
                e.printStackTrace();
            }
            return null;
        }

        @Override
        public  byte[] serialize(T object) {

            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(bos);
                oos.writeObject(object);
                return bos.toByteArray();
            } catch (IOException e) {
                log.error("Failed to serialize serialize java");
                e.printStackTrace();
            }
            return null;
        }
    },
    
    Json{
        @Override
        public  T deserialize(Class clazz, byte[] bytes) {
            Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new SerializeType.ClassCodec()).create();
            //Gson gson=new Gson();
            String json = new String(bytes, StandardCharsets.UTF_8);
            log.debug("deserialize--{}",json);
            System.out.println("json--deserialize--"+json);
            return gson.fromJson(json, clazz);
        }

        @Override
        public  byte[] serialize(T object) {
            //Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new SerializeType.ClassCodec()).create();
            Gson gson=new Gson();
            String json = null;
            try {
                json = gson.toJson(object);
            } catch (Exception e) {
                e.printStackTrace();
            }
            log.debug("serialize--{}",json);
            System.out.println("json--serialize--"+json);
            return json.getBytes(StandardCharsets.UTF_8);
        }
    },
    
    Protobuf{
        @Override
        public  T deserialize(Class clazz, byte[] bytes) {
            return null;
        }

        @Override
        public  byte[] serialize(T object) {
            return new byte[0];
        }
    };


    
    class ClassCodec implements JsonSerializer>, JsonDeserializer> {

        @Override
        public Class deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
            try {
                String str = json.getAsString();
                return Class.forName(str);
            } catch (ClassNotFoundException e) {
                throw new JsonParseException(e);
            }
        }

        @Override             //   String.class
        public JsonElement serialize(Class src, Type typeOfSrc, JsonSerializationContext context) {
            // class -> json
            return new JsonPrimitive(src.getName());
        }
    }

自定义的协议:

@ChannelHandler.Sharable
public class MessageCodec extends MessageToMessageCodec        {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, List outList) throws Exception {
        ByteBuf out = channelHandlerContext.alloc().buffer();
        //魔数 5字节
        out.writeBytes(new byte[]{'b','a','i','y','e'});
        //版本号 1字节
        out.writeByte(1);
        //序列化方式 1字节
        out.writeByte(Config.SERIALIZER.ordinal());
        //消息类型 1字节
        out.writeByte(message.getMessageType());
        // 消息序列号 8字节
        out.writeBytes(Longs.toByteArray(message.getSequenceId()));
        //序列化后的消息内容
        byte[] bytes = new byte[0];
        try {
            bytes = Config.SERIALIZER.serialize(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //消息长度
        out.writeInt(bytes.length);
        //写入消息内容
        out.writeBytes(bytes);
        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
        byte[] magic=new byte[5];
        //魔数 5字节
        byteBuf.readBytes(magic,0,magic.length);
        //版本号 1字节
        byte version=byteBuf.readByte();
        //序列化方式 1字节
        byte serializationType = byteBuf.readByte();
        //消息类型 1字节
        byte messageType=byteBuf.readByte();
        //消息序列号 8字节
        long sequenceId=byteBuf.readLong();
        //消息长度
        int length=byteBuf.readInt();
        //消息具体内容
        byte[] bytes = new byte[length];
        byteBuf.readBytes(bytes, 0, length);
        //反序列化后的消息内容
        SerializeType type = SerializeType.values()[serializationType];
        Class messageClass = Message.getMessageClass(messageType);
        Message message = null;
        try {
            message = type.deserialize(messageClass, bytes);
        } catch (Exception e) {
            e.printStackTrace();
        }
        list.add(message);

    }
}
 

负载均衡算法:实现了两个分别是:随机和轮换

public interface LoadBalancer {
    Instance getInstance(List instances);
}
public class RandomLoadBalance implements LoadBalancer{
    private static final Random random = new Random();
    @Override
    public Instance getInstance(List instances) {
        return instances.get(random.nextInt(instances.size()));
    }
}

public class RoundRobinLoadBalance implements LoadBalancer{
    private AtomicInteger atomicInteger=new AtomicInteger(0);
    @Override
    public Instance getInstance(List instances) {
        int current;
        int next;
        do{
            current=atomicInteger.get();
            next = current >= Integer.MAX_VALUE ? 0 : current + 1;
        }while (!atomicInteger.compareAndSet(current,next));

        return instances.get(next%instances.size());

    }
}

解决粘包/拆包的方法:
主要思想是添加一个长度字段,用于记录偏移量。

public class ProcotolframeDecoder extends LengthFieldbasedframeDecoder {
    public ProcotolframeDecoder() {
        this(1024, 16, 4, 0, 0);
    }
    public ProcotolframeDecoder(int maxframeLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxframeLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }
}

关于在Nacos获取服务的部分:
自动扫描注册

public static void registerTogNacos(){
        Set> classes = ServicesFactory.map.keySet();
        for (Class clazz:classes
             ) {
            try {
                NacosRegistryCentre.namingService.registerInstance(clazz.getName(),Config.IP,Config.PORT);
            } catch (NacosException e) {
                log.error("{} 注册nacos异常",clazz.getName());
                e.printStackTrace();
            }
        }

    }


public class NacosRegistryCentre implements RegistryCentre{
    
    private static ConcurrentHashMap>> notified = new ConcurrentHashMap<>();

    
    private static ConcurrentHashMap>> instancesCache = new ConcurrentHashMap<>();

    private final RoundRobinLoadBalance roundRobinLoadBalance=new RoundRobinLoadBalance();

    public static NamingService namingService;
    static {
        try {
            namingService = NamingFactory.createNamingService(Config.NACOS_SERVER_ADDR);
        } catch (NacosException e) {
            log.error("Failed to connect to nacos");
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    @Override
    public Instance getServices(String serviceName, String GroupName) {
        try {
            List allInstances = namingService.getAllInstances(serviceName, GroupName);
            return roundRobinLoadBalance.getInstance(allInstances);
        } catch (NacosException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("no services efficient");
    }

    @Override
    public Instance getServices(String serviceName) {
        try {
            List allInstances = namingService.getAllInstances(serviceName);
            return roundRobinLoadBalance.getInstance(allInstances);
        } catch (NacosException e) {
            e.printStackTrace();
        }
        throw new RuntimeException("no services efficient");

    }
}

源码:https://gitee.com/alice-175/netty-rpc

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号