栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java 从零开始手写 RPC (06) reflect 反射实现通用调用之客户端

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java 从零开始手写 RPC (06) reflect 反射实现通用调用之客户端

通用调用

java 从零开始手写 RPC (01) 基于 socket 实现

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

java 从零开始手写 RPC (03) 如何实现客户端调用服务端?

java 从零开始手写 RPC (04) -序列化

上一篇我们介绍了,如何实现基于反射的通用服务端。

这一节我们来一起学习下如何实现通用客户端。

因为内容较多,所以拆分为 2 个部分。

基本思路

所有的方法调用,基于反射进行相关处理实现。

核心类

为了便于拓展,我们把核心类调整如下:

package com.github.houbb.rpc.client.core;

import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.context.RpcClientContext;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.common.constant.RpcConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;


@ThreadSafe
public class RpcClient {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    
    private final String address;

    
    private final int port;

    
    private final ChannelHandler channelHandler;

    public RpcClient(final RpcClientContext clientContext) {
        this.address = clientContext.address();
        this.port = clientContext.port();
        this.channelHandler = clientContext.channelHandler();
    }

    
    public ChannelFuture connect() {
        // 启动服务端
        log.info("RPC 服务开始启动客户端");

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        
        ChannelFuture channelFuture;
        try {
            Bootstrap bootstrap = new Bootstrap();
            channelFuture = bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer(){
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline()
                                    // 解码 bytes=>resp
                                    .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
                                    // request=>bytes
                                    .addLast(new ObjectEncoder())
                                    // 日志输出
                                    .addLast(new LoggingHandler(LogLevel.INFO))
                                    .addLast(channelHandler);
                        }
                    })
                    .connect(address, port)
                    .syncUninterruptibly();
            log.info("RPC 服务启动客户端完成,监听地址 {}:{}", address, port);
        } catch (Exception e) {
            log.error("RPC 客户端遇到异常", e);
            throw new RuntimeException(e);
        }
        // 不要关闭线程池!!!

        return channelFuture;
    }

}

可以灵活指定对应的服务端地址、端口信息。

ChannelHandler 作为处理参数传入。

ObjectDecoder、ObjectEncoder、LoggingHandler 都和服务端类似,是 netty 的内置实现。

RpcClientHandler

客户端的 handler 实现如下:


package com.github.houbb.rpc.client.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


public class RpcClientHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    
    private final InvokeService invokeService;

    public RpcClientHandler(InvokeService invokeService) {
        this.invokeService = invokeService;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse)msg;
        invokeService.addResponse(rpcResponse.seqId(), rpcResponse);
        log.info("[Client] response is :{}", rpcResponse);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行)
        // 个人理解:如果不关闭,则永远会被阻塞。
        ctx.flush();
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

只有 channelRead0 做了调整,基于 InvokeService 对结果进行处理。

InvokeService 接口
package com.github.houbb.rpc.client.invoke;

import com.github.houbb.rpc.common.rpc.domain.RpcResponse;


public interface InvokeService {

    
    InvokeService addRequest(final String seqId);

    
    InvokeService addResponse(final String seqId, final RpcResponse rpcResponse);

    
    RpcResponse getResponse(final String seqId);

}

主要是对入参、出参的设置,以及出参的获取。

实现
package com.github.houbb.rpc.client.invoke.impl;

import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


public class DefaultInvokeService implements InvokeService {

    private static final Log LOG = LogFactory.getLog(DefaultInvokeService.class);

    
    private final Set requestSet;

    
    private final ConcurrentHashMap responseMap;

    public DefaultInvokeService() {
        requestSet = Guavas.newHashSet();
        responseMap = new ConcurrentHashMap<>();
    }

    @Override
    public InvokeService addRequest(String seqId) {
        LOG.info("[Client] start add request for seqId: {}", seqId);
        requestSet.add(seqId);
        return this;
    }

    @Override
    public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {
        // 这里放入之前,可以添加判断。
        // 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。
        LOG.info("[Client] 获取结果信息,seq: {}, rpcResponse: {}", seqId, rpcResponse);
        responseMap.putIfAbsent(seqId, rpcResponse);

        // 通知所有等待方
        LOG.info("[Client] seq 信息已经放入,通知所有等待方", seqId);

        synchronized (this) {
            this.notifyAll();
        }

        return this;
    }

    @Override
    public RpcResponse getResponse(String seqId) {
        try {
            RpcResponse rpcResponse = this.responseMap.get(seqId);
            if(ObjectUtil.isNotNull(rpcResponse)) {
                LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
                return rpcResponse;
            }

            // 进入等待
            while (rpcResponse == null) {
                LOG.info("[Client] seq {} 对应结果为空,进入等待", seqId);
                // 同步等待锁
                synchronized (this) {
                    this.wait();
                }

                rpcResponse = this.responseMap.get(seqId);
                LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);
            }

            return rpcResponse;
        } catch (InterruptedException e) {
            throw new RpcRuntimeException(e);
        }
    }
}

使用 requestSet 存储对应的请求入参。

使用 responseMap 存储对应的请求出参,在获取的时候通过同步 while 循环等待,获取结果。

此处,通过 notifyAll() 和 wait() 进行等待和唤醒。

ReferenceConfig-服务端配置 说明

我们想调用服务端,首先肯定要定义好要调用的对象。

ReferenceConfig 就是要告诉 rpc 框架,调用的服务端信息。

接口
package com.github.houbb.rpc.client.config.reference;

import com.github.houbb.rpc.common.config.component.RpcAddress;

import java.util.List;


public interface ReferenceConfig {

    
    ReferenceConfig serviceId(final String serviceId);

    
    String serviceId();

    
    Class serviceInterface();

    
    ReferenceConfig serviceInterface(final Class serviceInterface);

    
    ReferenceConfig addresses(final String addresses);

    
    T reference();

}
实现
package com.github.houbb.rpc.client.config.reference.impl;

import com.github.houbb.heaven.constant.PunctuationConst;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.NumUtil;
import com.github.houbb.rpc.client.config.reference.ReferenceConfig;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.core.context.impl.DefaultRpcClientContext;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService;
import com.github.houbb.rpc.client.proxy.ReferenceProxy;
import com.github.houbb.rpc.client.proxy.context.ProxyContext;
import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext;
import com.github.houbb.rpc.common.config.component.RpcAddress;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;

import java.util.List;


public class DefaultReferenceConfig implements ReferenceConfig {

    
    private String serviceId;

    
    private Class serviceInterface;

    
    private List rpcAddresses;

    
    private List channelFutures;

    
    @Deprecated
    private RpcClientHandler channelHandler;

    
    private InvokeService invokeService;

    public DefaultReferenceConfig() {
        // 初始化信息
        this.rpcAddresses = Guavas.newArrayList();
        this.channelFutures = Guavas.newArrayList();
        this.invokeService = new DefaultInvokeService();
    }

    @Override
    public String serviceId() {
        return serviceId;
    }

    @Override
    public DefaultReferenceConfig serviceId(String serviceId) {
        this.serviceId = serviceId;
        return this;
    }

    @Override
    public Class serviceInterface() {
        return serviceInterface;
    }

    @Override
    public DefaultReferenceConfig serviceInterface(Class serviceInterface) {
        this.serviceInterface = serviceInterface;
        return this;
    }

    @Override
    public ReferenceConfig addresses(String addresses) {
        ArgUtil.notEmpty(addresses, "addresses");

        String[] addressArray = addresses.split(PunctuationConst.COMMA);
        ArgUtil.notEmpty(addressArray, "addresses");

        for(String address : addressArray) {
            String[] addressSplits = address.split(PunctuationConst.colon);
            if(addressSplits.length < 2) {
                throw new IllegalArgumentException("Address must be has ip and port, like 127.0.0.1:9527");
            }
            String ip = addressSplits[0];
            int port = NumUtil.toIntegerThrows(addressSplits[1]);
            // 包含权重信息
            int weight = 1;
            if(addressSplits.length >= 3) {
                weight = NumUtil.toInteger(addressSplits[2], 1);
            }

            RpcAddress rpcAddress = new RpcAddress(ip, port, weight);
            this.rpcAddresses.add(rpcAddress);
        }

        return this;
    }

    
    @Override
    public T reference() {
        // 1. 启动 client 端到 server 端的连接信息
        // 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。
        // 1.2 初期为了简单,直接使用同步循环的方式。
        // 创建 handler
        // 循环连接
        for(RpcAddress rpcAddress : rpcAddresses) {
            final ChannelHandler channelHandler = new RpcClientHandler(invokeService);
            final DefaultRpcClientContext context = new DefaultRpcClientContext();
            context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler);
            ChannelFuture channelFuture = new RpcClient(context).connect();
            // 循环同步等待
            // 如果出现异常,直接中断?捕获异常继续进行??
            channelFutures.add(channelFuture);
        }

        // 2. 接口动态代理
        ProxyContext proxyContext = buildReferenceProxyContext();
        return ReferenceProxy.newProxyInstance(proxyContext);
    }

    
    private ProxyContext buildReferenceProxyContext() {
        DefaultProxyContext proxyContext = new DefaultProxyContext<>();
        proxyContext.serviceId(this.serviceId);
        proxyContext.serviceInterface(this.serviceInterface);
        proxyContext.channelFutures(this.channelFutures);
        proxyContext.invokeService(this.invokeService);
        return proxyContext;
    }

}

这里主要根据指定的服务端信息,初始化对应的代理实现。

这里还可以拓展指定权重,便于后期负载均衡拓展,本期暂时不做实现。

ReferenceProxy 说明

所有的 rpc 调用,客户端只有服务端的接口。

那么,怎么才能和调用本地方法一样调用远程方法呢?

答案就是动态代理。

实现

实现如下:

package com.github.houbb.rpc.client.proxy;

import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.proxy.context.ProxyContext;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcRequest;
import com.github.houbb.rpc.common.support.id.impl.Uuid;
import com.github.houbb.rpc.common.support.time.impl.DefaultSystemTime;
import io.netty.channel.Channel;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;


public class ReferenceProxy implements InvocationHandler {

    private static final Log LOG = LogFactory.getLog(ReferenceProxy.class);

    
    private final ProxyContext proxyContext;

    
    private ReferenceProxy(ProxyContext proxyContext) {
        this.proxyContext = proxyContext;
    }

    
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 反射信息处理成为 rpcRequest
        final String seqId = Uuid.getInstance().id();
        final long createTime = DefaultSystemTime.getInstance().time();
        DefaultRpcRequest rpcRequest = new DefaultRpcRequest();
        rpcRequest.serviceId(proxyContext.serviceId());
        rpcRequest.seqId(seqId);
        rpcRequest.createTime(createTime);
        rpcRequest.paramValues(args);
        rpcRequest.paramTypeNames(ReflectMethodUtil.getParamTypeNames(method));
        rpcRequest.methodName(method.getName());

        // 调用远程
        LOG.info("[Client] start call remote with request: {}", rpcRequest);
        proxyContext.invokeService().addRequest(seqId);

        // 这里使用 load-balance 进行选择 channel 写入。
        final Channel channel = getChannel();
        LOG.info("[Client] start call channel id: {}", channel.id().asLongText());

        // 对于信息的写入,实际上有着严格的要求。
        // writeAndFlush 实际是一个异步的操作,直接使用 sync() 可以看到异常信息。
        // 支持的必须是 ByteBuf
        channel.writeAndFlush(rpcRequest).sync();

        // 循环获取结果
        // 通过 Loop+match  wait/notifyAll 来获取
        // 分布式根据 redis+queue+loop
        LOG.info("[Client] start get resp for seqId: {}", seqId);
        RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
        LOG.info("[Client] start get resp for seqId: {}", seqId);
        Throwable error = rpcResponse.error();
        if(ObjectUtil.isNotNull(error)) {
            throw error;
        }
        return rpcResponse.result();
    }

    
    private Channel getChannel() {
        return proxyContext.channelFutures().get(0).channel();
    }

    
    @SuppressWarnings("unchecked")
    public static  T newProxyInstance(ProxyContext proxyContext) {
        final Class interfaceClass = proxyContext.serviceInterface();
        ClassLoader classLoader = interfaceClass.getClassLoader();
        Class[] interfaces = new Class[]{interfaceClass};
        ReferenceProxy proxy = new ReferenceProxy(proxyContext);
        return (T) Proxy.newProxyInstance(classLoader, interfaces, proxy);
    }

}

客户端初始化 newProxyInstance 的就是创建的代理的过程。

客户端调用远程方法,实际上是调用 invoke 的过程。

(1)构建反射 invoke 请求信息,添加 reqId

(2)netty 远程调用服务端

(3)同步获取响应信息

测试 引入 maven

    com.github.houbb
    rpc-client
    0.0.6

测试代码
public static void main(String[] args) {
    // 服务配置信息
    ReferenceConfig config = new DefaultReferenceConfig();
    config.serviceId(ServiceIdConst.CALC);
    config.serviceInterface(CalculatorService.class);
    config.addresses("localhost:9527");

    CalculatorService calculatorService = config.reference();
    CalculateRequest request = new CalculateRequest();
    request.setOne(10);
    request.setTwo(20);

    CalculateResponse response = calculatorService.sum(request);
    System.out.println(response);
}

测试日志:

[DEBUG] [2021-10-05 14:16:17.534] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 14:16:17.625] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务开始启动客户端
...
[INFO] [2021-10-05 14:16:19.328] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务启动客户端完成,监听地址 localhost:9527
[INFO] [2021-10-05 14:16:19.346] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2021-10-05 14:16:19.347] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
[INFO] [2021-10-05 14:16:19.348] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000017bc-00000000-399b9d7e1b88839d-5ccc4a29
十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler write
信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] WRITE: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler flush
信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] FLUSH
[INFO] [2021-10-05 14:16:19.412] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
[INFO] [2021-10-05 14:16:19.413] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 对应结果为空,进入等待
十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] READ: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
...
[INFO] [2021-10-05 14:16:19.505] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取结果信息,seq: a525c5a6196545f5a5241b2cdc2ec2c2, rpcResponse: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:16:19.505] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seq 信息已经放入,通知所有等待方
[INFO] [2021-10-05 14:16:19.506] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:16:19.506] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 对应结果已经获取: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:16:19.507] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
CalculateResponse{success=true, sum=30}
小结

现在看来有一个小问题,要求服务端必须指定 port,这有点不太合理,比如代理域名,后续需要优化。

这里的启动声明方式也比较基础,后续可以考虑和 spring 进行整合。

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/318315.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号