栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

c++ rpc(rpc实现几种方式)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

c++ rpc(rpc实现几种方式)

一起实现RPC,超详细!!! 第三篇 Netty:

在之前,我们实现了基于传统BIO方式的传输模式。这里,我们将使用效率更高、更方便的Netty进行传输。


实现过程如下: 一、导入依赖 1.导入Netty依赖

RPC-CORE


    io.netty
    netty-all
    4.1.50.Final

2.导入jackson依赖
        
            com.fasterxml.jackson.core
            jackson-core
            2.11.0
        
        
            com.fasterxml.jackson.core
            jackson-databind
            2.11.0
        
        
            com.fasterxml.jackson.core
            jackson-annotations
            2.11.0
        
二、序列化 1.序列化接口

创建CommonSerializer接口(switch case 方便后续添加其他序列化方法)

package com.t598.core.serializer;

public interface CommonSerializer {
    byte[] serializer(Object object);

    Object deserialize(byte[] bytes, Class clazz) throws Exception;

    int getCode();

    static CommonSerializer getByte(int code) {
        switch (code) {
            case 1:
                return new JsonSerializer();
            default:
                return null;
        }
    }
}

2.实现JSON的序列化器

使用JSON序列化器无法保证反序列化后仍然为原实例类型,后续会修改

package com.t598.core.serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.t598.common.entity.RpcRequest;
import com.t598.common.enumeration.SerializerCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class JsonSerializer implements CommonSerializer{
    private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);

    private ObjectMapper objectMapper = new ObjectMapper();

    
    @Override
    public byte[] serializer(Object object) {
        try {
            // 将Object值序列化为字节数组
            return objectMapper.writevalueAsBytes(object);
        } catch (JsonProcessingException e) {
            logger.error("序列化时发生错误:{}", e.getMessage());
            e.printStackTrace();
            return null;
        }
    }

    
    @Override
    public Object deserialize(byte[] bytes, Class clazz) throws Exception {
        try {
            // 根据clazz将bytes反序列换为对应的实例,还需要判断反序列化后实例类型是否正确
            // 因为RepRequest中有一个Object的数据,进行反序列时可能会失败,
            Object obj = objectMapper.readValue(bytes, clazz);
            if (obj instanceof RpcRequest) {
                obj = handleRequest(obj);
            }
            return obj;
        } catch (IOException e) {
            logger.error("反序列化时发生错误:{}", e.getMessage());
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public int getCode() {
        return SerializerCode.valueOf("JSON").getCode();
    }

    private Object handleRequest(Object obj) throws IOException {
        RpcRequest rpcRequest = (RpcRequest) obj;
        for (int i = 0; i < rpcRequest.getParamTypes().length; i++) {
            Class clazz = rpcRequest.getParamTypes()[i];
            // 确定此Class对象表示的类或接口是否与指定的Class参数表示的类或接口相同,
            // 或者是其超类或超接口。 如果是,则返回true ; 否则返回false 。
            if (!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
                // 如果不对应,则将这个参数重新反序列化为对应的参数类型。
                byte[] bytes = objectMapper.writevalueAsBytes(rpcRequest.getParameters()[i]);
                rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
            }
        }
        return rpcRequest;
    }
}

三、协议 1.自定义协议
    @Override
    public void start(int port) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .option(ChannelOption.SO_BACKLOG, 256)  //服务端接收连接的队列长度
                    .option(ChannelOption.SO_KEEPALIVE, true)  //心跳机制,保持心跳连接
                    .childOption(ChannelOption.TCP_NODELAY, true)  //立即发送数据
                    .childHandler(new ChannelInitializer(){
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new CommonEncoder(new JsonSerializer()));
                            pipeline.addLast(new CommonDecoder());
                            pipeline.addLast(new NettyServerHandler());
                    }
        });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        }catch (InterruptedException e){
            logger.error("启动服务器时有错误发生: ", e);
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

NettyClient

package com.t598.core.transport.netty.client;

import com.t598.common.entity.RpcRequest;
import com.t598.common.entity.RpcResponse;
import com.t598.core.codec.CommonDecoder;
import com.t598.core.codec.CommonEncoder;
import com.t598.core.serializer.JsonSerializer;
import com.t598.core.transport.RpcClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClient implements RpcClient {
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    private String host;
    private int port;
    private static final Bootstrap bootstrap;

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    
    static {
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new CommonDecoder());
                        pipeline.addLast(new CommonEncoder(new JsonSerializer()));
                        pipeline.addLast(new NettyClientHandler());
                    }
                })
                .option(ChannelOption.SO_KEEPALIVE, true);
    }

    
    @Override
    public Object sendRequest(RpcRequest rpcRequest) {
        try {
            ChannelFuture future = bootstrap.connect(host, port).sync();
            logger.info("客户端连接到服务器:{}:{}", host, port);
            // 获取到连接的通道
            Channel channel = future.channel();
            if (channel != null) {
                // 异步方法进行监听,当发送数据后判断是否发送成功,如果成功的话打印成功的日志,否则打印失败的日志
                // 这里的发送时非阻塞的,判断是否发送成功但不能得到数据,
                channel.writeAndFlush(rpcRequest).addListener(future1 -> {
                    if (future1.isSuccess()) {
                        logger.info(String.format("客户端发送消息:%s", rpcRequest.toString()));
                    } else {
                        logger.error("发送消息时有错误发生:", future.cause());
                    }
                });
                channel.closeFuture().sync();
                // 这里通过获取到key 的方式获取到返回结果。
                AttributeKey key = AttributeKey.valueOf("rpcResponse");
                RpcResponse rpcResponse = channel.attr(key).get();
                return rpcResponse.getData();
            }
        } catch (InterruptedException e) {
            logger.error("发送消息时有错误发生:", e);
        }
        return null;
    }
}

3.Handler

NettyClientHandler

package com.t598.core.transport.netty.client;

import com.t598.common.entity.RpcRequest;
import com.t598.common.entity.RpcResponse;
import com.t598.common.factory.SingletonFactory;
import com.t598.core.serializer.CommonSerializer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;


public class NettyClientHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

    private final UnprocessedRequests unprocessedRequests;

    public NettyClientHandler() {
        this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
        try {
            logger.info(String.format("客户端接收到消息: %s", msg));
            unprocessedRequests.complete(msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("过程调用时有错误发生:");
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                logger.info("发送心跳包 [{}]", ctx.channel().remoteAddress());
                Channel channel = ChannelProvider.get((InetSocketAddress) ctx.channel().remoteAddress(), CommonSerializer.getByte(CommonSerializer.DEFAULT_SERIALIZER));
                RpcRequest rpcRequest = new RpcRequest();
                rpcRequest.setHeartBeat(true);
                channel.writeAndFlush(rpcRequest).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

NettyServerHandler

package com.t598.core.transport.netty.server;

import com.t598.common.entity.RpcRequest;
import com.t598.common.entity.RpcResponse;
import com.t598.common.factory.SingletonFactory;
import com.t598.core.handler.RequestHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class NettyServerHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private final RequestHandler requestHandler;

    public NettyServerHandler() {
        this.requestHandler = SingletonFactory.getInstance(RequestHandler.class);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
        try {
            if(msg.getHeartBeat()) {
                logger.info("接收到客户端心跳包...");
                return;
            }
            logger.info("服务器接收到请求: {}", msg);
            Object result = requestHandler.handle(msg);
            if (ctx.channel().isActive() && ctx.channel().isWritable()) {
                ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
            } else {
                logger.error("通道不可写");
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("处理过程调用时有错误发生:");
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                logger.info("长时间未收到心跳包,断开连接...");
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

}

4.额外的类

PackageType(PRC-COMMON)

package com.t598.common.enumeration;

import lombok.AllArgsConstructor;
import lombok.Getter;


@AllArgsConstructor
@Getter
public enum PackageType {

    REQUEST_PACK(0),
    RESPONSE_PACK(1);

    private final int code;

}

SerializerCode(PRC-COMMON)

package com.t598.common.enumeration;

import lombok.AllArgsConstructor;
import lombok.Getter;


@AllArgsConstructor
@Getter
public enum SerializerCode {

    KRYO(0),
    JSON(1),
    HESSIAN(2),
    PROTOBUF(3);

    private final int code;

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773513.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号