栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

手打RPC-构建一个通信核心

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

手打RPC-构建一个通信核心

本篇开始准备手打一个简易的RPC框架。
主要实现功能:

  1. 序列化通信
    自定义通信协议,进行rpc序列化以及反序列化
  2. 动态注入
    服务启动自动注入消费者以及提供者
  3. 注册中心
    注册中心获取服务,并监听服务变化
  4. 拦击器功能
    提供请求响应拦截功能,提供扩展接口
  5. 负载均衡
    实现客户端对服务的负载均衡功能

整体流程如下:

本章主要实现序列化功能,这里以Netty为核心进行构建。

一.pom依赖

jdk为1.8,pom 文件依赖如下:

	
        4.1.74.Final
        1.7.4
        5.2.12.RELEASE
        1.2.72
        1.7.30
    

    
        
            org.springframework
            spring-context
            ${spring-framework.version}
        
        
            com.alibaba
            fastjson
            ${fastjson.version}
        
        
            io.netty
            netty-all
            ${netty-version}
        
        
            io.protostuff
            protostuff-core
            ${protostuff-version}
        
        
            io.protostuff
            protostuff-runtime
            ${protostuff-version}
        
        
            org.slf4j
            slf4j-api
            ${slf4j.version}
        
    
二. 实现客户端启动类

客户端代码如下:

public class NettyClientTask implements Runnable {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public ChannelFuture channelFuture;

    private final String ip;

    private final int port;

    public NettyClientTask(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void run() {
    	// 创建线程组
         NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootStrap = new Bootstrap();
            bootStrap.group(group)
            		  // 通道类型
                     .channel(NioSocketChannel.class)
                     .option(ChannelOption.SO_KEEPALIVE, true)
                     .option(ChannelOption.AUTO_READ, true)
                      // 设置管道上的处理器
                     .handler(new ChannelInitializer(){
                         @Override
                         protected void initChannel(SocketChannel socketChannel) throws Exception {
                             socketChannel.pipeline()
                                          .addLast(new DataDecoder(Response.class))
                                          .addLast(new DataEncoder(Request.class))
                                          .addLast(new ClientHandle());
                         }
                     });
            log.info("开始启动客户端...");
            channelFuture = bootStrap.connect(ip, port).sync();
            log.info("启动客户端结束");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("客户端启动异常",e);
        } finally {
            group.shutdownGracefully();
        }
    }
}
三. 实现服务端启动类

服务端启动类代码如下:

public class NettyServerTask implements Runnable{

    private final Logger log = LoggerFactory.getLogger(this.getClass());


    public static ChannelFuture channelFuture;

    private final String ip;

    private final int port;

    public NettyServerTask(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void run() {
    	// 创建线程组,bossGroup用于接收连接,workGroup用于处理读写事件
         EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
            		  // 通道类型
                     .channel(NioServerSocketChannel.class)
                     .option(ChannelOption.SO_BACKLOG, 128)
                     .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                     .childOption(ChannelOption.SO_KEEPALIVE, true)
                     // 设置管道上的处理器
                     .childHandler(new ChannelInitializer(){
                         @Override
                         protected void initChannel(SocketChannel socketChannel) throws Exception {
                             socketChannel.pipeline()
                                          .addLast(new DataDecoder(Request.class))
                                          .addLast(new DataEncoder(Response.class))
                                          .addLast(new ServerHandle());
                         }
                     });
            log.info("开始启动服务端...");
            channelFuture = bootstrap.bind(ip,port).sync();
            log.info("启动服务端结束");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("客户端启动异常",e);
        } finally{
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}
四. 处理器

自定义Netty处理器,实现读写事件功能。

  • 客户端处理器如下:
public class ClientHandle extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Response response = (Response) msg;
        RainFuture future = ConsumerContext.RESULT_MAP.get(response.getId());
        if (future != null) {
            future.setResponse(response);
        }else{
            throw new RuntimeException("未找到对应future");
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}
  • 服务端处理器如下:
public class ServerHandle extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request= (Request) msg;
        try {
            Object result = ProviderContext.exec(request.getClassName(), request.getMethodName(), request.getData());
            Response response = Response.success(request.getId());
            response.setData(result);
            ctx.writeAndFlush(response);
        } catch (Exception e) {
            Response response = Response.error(request.getId());
            response.setMsg(e.getMessage());
            response.setCause(e);
            ctx.writeAndFlush(response);
        }


    }


}
四. 自定义通信协议

rpc通信协议和Http协议比较,目前主要区别是通用性上。rpc协议有一套自有的规则,类似方言,黑话,其他协议规则无法进行解析。

通信规则:

  1. 定义请求头
    对于请求头,可以指定请求的协议类型,版本号信息,过滤非约定的请求
  2. 定义序列化规则
    序列以及反序列,这里采用protostuff处理。它是Protocol Buffer 的包装版,比Protocol Buffer 更加易用,而且继承了Protocol Buffer 的高性能特点。
  • 数据序列化处理器:
public class DataEncoder extends MessageToByteEncoder {

    private Class genericClass;

    public DataEncoder(Class genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        byte[] bytes = SerializeUtil.serialize(o);
        // 模拟请求头,协议类型等数据
        byteBuf.writeInt(1);
        byteBuf.writeInt(bytes.length);
        byteBuf.writeBytes(bytes);
    }
}

  • 数据反序列化处理器:
public class DataDecoder extends ByteToMessageDecoder {

    private Class genericClass;

    public DataDecoder(Class genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List list) throws Exception {
        // 1个int对应4个字节
        if (buf.readableBytes() < 8) {
            return;
        }
        buf.markReaderIndex();
        // 处理版本号等
        int header = buf.readInt();
		if (header != 1) {
            throw new RuntimeException("不支持当前请求");
        }
        int dataLength = buf.readInt();
        if (buf.readableBytes() < dataLength) {
            buf.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        buf.readBytes(data);
        list.add(SerializeUtil.deserialize(data, genericClass));
    }
}
 

如何处理半包和粘包问题?
数据传输以字节流的形式进行的。
如果没标识,服务程序是无法辨别流的开始和结束,可能导致流数据不完整的情况。
本例对于这种情况,采用传输总字节大小方式,告诉服务接收端,数据大小。当然也可以采用特殊字符,作为标识的方式。

序列化工具类如下:

public class SerializeUtil {

    private final static LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

    private final static Map, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>();

    
    @SuppressWarnings("unchecked")
    public static  byte[] serialize(T obj) {
        Class clazz = (Class) obj.getClass();
        Schema schema = getSchema(clazz);
        byte[] data;
        try {
            data = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
        } finally {
            BUFFER.clear();
        }

        return data;
    }

    
    public static  T deserialize(byte[] data, Class clazz) {
        Schema schema = getSchema(clazz);
        T obj = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(data, obj, schema);
        return obj;
    }

    @SuppressWarnings("unchecked")
    private static  Schema getSchema(Class clazz) {
        Schema schema = (Schema) SCHEMA_CACHE.get(clazz);
        if (Objects.isNull(schema)) {
            schema = RuntimeSchema.getSchema(clazz);
            SCHEMA_CACHE.put(clazz, schema);
        }
        return schema;
    }

}

项目详细代码参考

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号