栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【手写一个RPC框架】simpleRPC-05

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【手写一个RPC框架】simpleRPC-05

前言

本次改进主要针对序列化的方式进行改进。之前一直在使用java自带的序列化方式,或者是netty的序列化方式,性能不高。

这个版本我们定义自己的序列化方式。

实现 项目创建

创建一个名为simpleRPC-05的 module

创建名为com.rpc的包

依赖配置

我们需要新加入一个fastjson依赖:

pom.xml



    
        SimpleRPC
        org.example
        1.0-SNAPSHOT
    
    4.0.0

    simpleRPC-05

    
        8
        8
    

    
        
        
            org.projectlombok
            lombok
            1.18.12
            provided
        

        
            io.netty
            netty-all
            4.1.51.Final
        

        
        
            com.alibaba
            fastjson
            1.2.67
        
    



common

我们需要修改 RPCResponse.java 来适应新的序列化方式:

RPCResponse.java

package com.rpc.common;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import java.io.Serializable;


@Data
@Builder
@AllArgsConstructor
public class RPCResponse implements Serializable {
    
    private int code;
    private String message;
    // 更新,这里我们需要加入这个,不然用其它序列化方式(除了java Serialize)得不到data的type
    private Class dataType;
    private Object data;

    public static RPCResponse success(Object data) {
        return RPCResponse.builder().code(200).data(data).dataType(data.getClass()).build();
    }
    public static RPCResponse fail() {
        return RPCResponse.builder().code(500).message("服务器发生错误").build();
    }
}


RPCRequest.java 跟原来一样

package com.rpc.common;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import java.io.Serializable;


@Data
@Builder
@AllArgsConstructor
public class RPCRequest implements Serializable {
    // 服务类名,客户端只知道接口名,在服务端中用接口名指向实现类
    private String interfaceName;
    // 方法名
    private String methodName;
    // 参数列表
    private Object[] params;
    // 参数类型
    private Class[] paramsTypes;
}

Blog.java

package com.rpc.common;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Blog implements Serializable {
    private Integer id;
    private Integer useId;
    private String title;
}

User.java

package com.rpc.common;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    // 客户端和服务端共有的,模拟RPC中传输的信息
    private Integer id;
    private String userName;
    private Boolean sex;
}

service

ServiceProvider.java

package com.rpc.server;


import java.util.HashMap;
import java.util.Map;


public class ServiceProvider {
    
    private Map interfaceProvider;

    // 构造函数,初始化一个空的 hashmap 赋给 Map interfaceProvider
    public ServiceProvider() {
        this.interfaceProvider = new HashMap<>();
    }

    public void provideServiceInterface(Object service) {
        // 反射,.getClass().getInterfaces()得到class的interface,按照interfaces name(key)和object(value)存入map
        Class[] interfaces = service.getClass().getInterfaces();
        for (Class clazz : interfaces) {
            interfaceProvider.put(clazz.getName(), service);
        }
    }

    public Object getService(String interfaceName) {
        return interfaceProvider.get(interfaceName);  // 通过interface name得到object
    }
}

UserService.java

package com.rpc.server;

import com.rpc.common.User;


public interface UserService {

    // 客户端通过这个接口调用服务端的实现类
    User getUserByUserId(Integer id);

    // 给这个服务增加一个功能
    Integer insertUserId(User user);
}

UserServiceImpl.java

package com.rpc.server;

import com.rpc.common.User;



public class UserServiceImpl implements UserService {

    @Override
    public User getUserByUserId(Integer id) {
        // 模拟从数据库中取用户的行为
        User user = User.builder()
                        .id(id)
                        .userName("he2121")
                        .sex(true).build();
        System.out.println("客户端查询了" + id + "的用户");
        return user;
    }

    @Override
    public Integer insertUserId(User user) {
        System.out.println("插入数据成功: " + user);
        return 1;
    }
}

BlogService.java

package com.rpc.server;

import com.rpc.common.Blog;

public interface BlogService {

    Blog getBlogById(Integer id);
}

BlogServiceImpl.java

package com.rpc.server;

import com.rpc.common.Blog;

public class BlogServiceImpl implements BlogService {

    @Override
    public Blog getBlogById(Integer id) {
        Blog blog = Blog.builder()
                        .id(id)
                        .title("我的博客")
                        .useId(22).build();
        System.out.println("客户端查询了" + id + "博客");
        return blog;
    }
}

codec

我们定义序列化接口:

Serializer.java

package com.rpc.codec;


public interface Serializer {

    // 把对象序列化成字节数组
    byte[] serialize(Object obj);
    // 从字节数组反序列化成消息, 使用java自带序列化方式不用messageType也能得到相应的对象(序列化字节数组里包含类信息)
    // 其它方式需指定消息格式,再根据message转化成相应的对象
    Object deserialize(byte[] bytes, int messageType);
    // 返回使用的序列器,是哪个
    // 0:java自带序列化方式, 1: json序列化方式
    int getType();
    // 根据序号取出序列化器,暂时有两种实现方式,需要其它方式,实现这个接口即可
    static Serializer getSerializerByCode(int code){
        switch (code){
            case 0:
                return new ObjectSerializer();
            case 1:
                return new JsonSerializer();
            default:
                return null;
        }
    }
}

定义序列化类

ObjectSerializer.java

package com.rpc.codec;

import java.io.*;

public class ObjectSerializer implements Serializer {

    // 利用java IO 对象 -> 字节数组
    @Override
    public byte[] serialize(Object obj) {
        byte[] bytes = null;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(obj);
            oos.flush();
            bytes = bos.toByteArray();
            oos.close();
            bos.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return bytes;
    }

    // 字节数组 -> 对象
    @Override
    public Object deserialize(byte[] bytes, int messageType) {
        Object obj = null;
        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
        try {
            ObjectInputStream ois = new ObjectInputStream(bis);
            obj = ois.readObject();
            ois.close();
            bis.close();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
        return obj;
    }

    // 0 代表java原生序列化器
    @Override
    public int getType() {
        return 0;
    }
}

JsonSerializer.java

package com.rpc.codec;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;


public class JsonSerializer implements Serializer{
    // 序列化底层是FASTJSON把对象转化为字节流(byte数组)
    @Override
    public byte[] serialize(Object obj) {
        byte[] bytes = JSONObject.toJSonBytes(obj);
        return bytes;
    }

    // 反序列化
    @Override
    public Object deserialize(byte[] bytes, int messageType) {
        Object obj = null;
        // 传输的消息分为request与response
        switch (messageType){
            case 0:  // 如果是request
                RPCRequest request = JSON.parseObject(bytes, RPCRequest.class);
                // 修bug 参数为空 直接返回
                if(request.getParams() == null) return request;

                Object[] objects = new Object[request.getParams().length];
                // 把json字串转化成对应的对象, fastjson可以读出基本数据类型,不用转化
                for(int i = 0; i < objects.length; i++){
                    Class paramsType = request.getParamsTypes()[i];
                    if (!paramsType.isAssignableFrom(request.getParams()[i].getClass())){
                        objects[i] = JSONObject.toJavaObject((JSONObject) request.getParams()[i],request.getParamsTypes()[i]);
                    }else{
                        objects[i] = request.getParams()[i];
                    }

                }
                request.setParams(objects);
                obj = request;
                break;
            case 1:  // 如果是response
                RPCResponse response = JSON.parseObject(bytes, RPCResponse.class);
                Class dataType = response.getDataType();
                if(! dataType.isAssignableFrom(response.getData().getClass())){
                    response.setData(JSONObject.toJavaObject((JSONObject) response.getData(),dataType));
                }
                obj = response;
                break;
            default:
                System.out.println("暂时不支持此种消息");
                throw new RuntimeException();
        }
        return obj;
    }

    // 1 代表着json序列化方式
    @Override
    public int getType() {
        return 1;
    }
}

MessageType.java

package com.rpc.codec;

import lombok.AllArgsConstructor;

@AllArgsConstructor
public enum MessageType {
    REQUEST(0),RESPonSE(1);
    
    private int code;
    
    public int getCode() {
        return code;
    }
}

MyEncode.java

package com.rpc.codec;


import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor;


@AllArgsConstructor
public class MyEncode extends MessageToByteEncoder {
    private Serializer serializer;

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        // 写入消息类型
        if(msg instanceof RPCRequest){
            out.writeShort(MessageType.REQUEST.getCode());
        }
        else if(msg instanceof RPCResponse){
            out.writeShort(MessageType.RESPONSE.getCode());
        }
        // 写入序列化方式
        out.writeShort(serializer.getType());
        // 得到序列化数组
        byte[] serialize = serializer.serialize(msg);
        // 写入长度
        out.writeInt(serialize.length);
        // 写入序列化字节数组
        out.writeBytes(serialize);
    }
}

MyDecode.java

package com.rpc.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.AllArgsConstructor;

import java.util.List;


@AllArgsConstructor
public class MyDecode extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        // 1. 读取消息类型
        short messageType = in.readShort();
        // 现在还只支持request与response请求
        if(messageType != MessageType.REQUEST.getCode() &&
                messageType != MessageType.RESPONSE.getCode()){
            System.out.println("暂不支持此种数据");
            return;
        }
        // 2. 读取序列化的类型
        short serializerType = in.readShort();
        // 根据类型得到相应的序列化器
        Serializer serializer = Serializer.getSerializerByCode(serializerType);
        if (serializer == null) throw new RuntimeException("不存在对应的序列化器");
        // 3. 读取数据序列化后的字节长度
        int length = in.readInt();
        // 4. 读取序列化数组
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        // 用对应的序列化器解码字节数组
        Object deserialize = serializer.deserialize(bytes, messageType);
        out.add(deserialize);
    }
}
 
client 

客户端部分和simpleRPC-04基本一样,但是NettyClientHandler.java 需要修改一下:

NettyClientInitializer.java

package com.rpc.client;

import com.rpc.codec.JsonSerializer;
import com.rpc.codec.MyDecode;
import com.rpc.codec.MyEncode;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldbasedframeDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;


public class NettyClientInitializer extends ChannelInitializer {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 使用自定义的编解码器
        pipeline.addLast(new MyDecode());
        // 编码需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的
        pipeline.addLast(new MyEncode(new JsonSerializer()));
        pipeline.addLast(new NettyClientHandler());
    }
}

其他部分都一样。

server

server部分和simpleRPC-04基本一样,同样的,NettyServerInitializer.java需要修改一下:

NettyServerInitializer.java

package com.rpc.server;

import com.rpc.codec.JsonSerializer;
import com.rpc.codec.MyDecode;
import com.rpc.codec.MyEncode;
import com.rpc.service.ServiceProvider;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldbasedframeDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.AllArgsConstructor;


@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer {
    private ServiceProvider serviceProvider;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 使用自定义的解码器
        pipeline.addLast(new MyDecode());
        // 使用自定义的编码器,而且解码器需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的
        pipeline.addLast(new MyEncode(new JsonSerializer()));
        pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
    }
}

其他部分一样。

文件结构

simpleRPC-05文件结构如下:


运行

运行TestServer.java

再运行TestClient.java

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号