栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java 从零开始手写 RPC (05) reflect 反射实现通用调用之服务端

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java 从零开始手写 RPC (05) reflect 反射实现通用调用之服务端

通用调用

java 从零开始手写 RPC (01) 基于 socket 实现

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

java 从零开始手写 RPC (03) 如何实现客户端调用服务端?

java 从零开始手写 RPC (04) -序列化

前面我们的例子是一个固定的出参和入参,固定的方法实现。

本节将实现通用的调用,让框架具有更广泛的实用性。

基本思路

所有的方法调用,基于反射进行相关处理实现。

服务端 核心类
  • RpcServer

调整如下:

serverBootstrap.group(workerGroup, bossGroup)
    .channel(NioServerSocketChannel.class)
    // 打印日志
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline()
            // 解码 bytes=>resp
            .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
             // request=>bytes
             .addLast(new ObjectEncoder())
             .addLast(new RpcServerHandler());
        }
    })
    // 这个参数影响的是还没有被accept 取出的连接
    .option(ChannelOption.SO_BACKLOG, 128)
    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
    .childOption(ChannelOption.SO_KEEPALIVE, true);

其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。

RpcServerHandler
package com.github.houbb.rpc.server.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


public class RpcServerHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcServerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel {} connected " + id);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel read start: {}", id);

        // 接受客户端请求
        RpcRequest rpcRequest = (RpcRequest)msg;
        log.info("[Server] receive channel {} request: {}", id, rpcRequest);

        // 回写到 client 端
        DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest);
        ctx.writeAndFlush(rpcResponse);
        log.info("[Server] channel {} response {}", id, rpcResponse);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    
    private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {
        DefaultRpcResponse rpcResponse = new DefaultRpcResponse();
        rpcResponse.seqId(rpcRequest.seqId());

        try {
            // 获取对应的 service 实现类
            // rpcRequest=>invocationRequest
            // 执行 invoke
            Object result = DefaultServiceFactory.getInstance()
                    .invoke(rpcRequest.serviceId(),
                            rpcRequest.methodName(),
                            rpcRequest.paramTypeNames(),
                            rpcRequest.paramValues());
            rpcResponse.result(result);
        } catch (Exception e) {
            rpcResponse.error(e);
            log.error("[Server] execute meet ex for request", rpcRequest, e);
        }

        // 构建结果值
        return rpcResponse;
    }

}

和以前类似,不过 handleRpcRequest 要稍微麻烦一点。

这里需要根据发射,调用对应的方法。

pojo

其中使用的出参、入参实现如下:

RpcRequest
package com.github.houbb.rpc.common.rpc.domain;

import java.util.List;


public interface RpcRequest extends baseRpc {

    
    long createTime();

    
    String serviceId();

    
    String methodName();

    
    List paramTypeNames();

    // 调用参数信息列表

    
    Object[] paramValues();

}
RpcResponse
package com.github.houbb.rpc.common.rpc.domain;


public interface RpcResponse extends baseRpc {

    
    Throwable error();

    
    Object result();

}
baseRpc
package com.github.houbb.rpc.common.rpc.domain;

import java.io.Serializable;


public interface baseRpc extends Serializable {

    
    String seqId();

    
    baseRpc seqId(final String traceId);

}
ServiceFactory-服务工厂

为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。

ServiceFactory
package com.github.houbb.rpc.server.service;

import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.registry.ServiceRegistry;

import java.util.List;


public interface ServiceFactory {

    
    ServiceFactory registerServices(final List serviceConfigList);

    
    Object invoke(final String serviceId, final String methodName,
                  List paramTypeNames, final Object[] paramValues);

}
DefaultServiceFactory

作为默认实现,如下:

package com.github.houbb.rpc.server.service.impl;

import com.github.houbb.heaven.constant.PunctuationConst;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
import com.github.houbb.heaven.util.util.CollectionUtil;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.service.ServiceFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class DefaultServiceFactory implements ServiceFactory {

    
    private Map serviceMap;

    
    private Map methodMap;

    private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory();

    private DefaultServiceFactory(){}

    public static DefaultServiceFactory getInstance() {
        return INSTANCE;
    }

    
    @Override
    public synchronized ServiceFactory registerServices(List serviceConfigList) {
        ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");

        // 集合初始化
        serviceMap = new HashMap<>(serviceConfigList.size());
        // 这里只是预估,一般为2个服务。
        methodMap = new HashMap<>(serviceConfigList.size()*2);

        for(ServiceConfig serviceConfig : serviceConfigList) {
            serviceMap.put(serviceConfig.id(), serviceConfig.reference());
        }

        // 存放方法名称
        for(Map.Entry entry : serviceMap.entrySet()) {
            String serviceId = entry.getKey();
            Object reference = entry.getValue();

            //获取所有方法列表
            Method[] methods = reference.getClass().getMethods();
            for(Method method : methods) {
                String methodName = method.getName();
                if(ReflectMethodUtil.isIgnoreMethod(methodName)) {
                    continue;
                }

                List paramTypeNames = ReflectMethodUtil.getParamTypeNames(method);
                String key = buildMethodKey(serviceId, methodName, paramTypeNames);
                methodMap.put(key, method);
            }
        }

        return this;
    }


    @Override
    public Object invoke(String serviceId, String methodName, List paramTypeNames, Object[] paramValues) {
        //参数校验
        ArgUtil.notEmpty(serviceId, "serviceId");
        ArgUtil.notEmpty(methodName, "methodName");

        // 提供 cache,可以根据前三个值快速定位对应的 method
        // 根据 method 进行反射处理。
        // 对于 paramTypes 进行 string 连接处理。
        final Object reference = serviceMap.get(serviceId);
        final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);
        final Method method = methodMap.get(methodKey);

        try {
            return method.invoke(reference, paramValues);
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw new RpcRuntimeException(e);
        }
    }

    
    private String buildMethodKey(String serviceId, String methodName, List paramTypeNames) {
        String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT);
        return serviceId+PunctuationConst.colon+methodName+PunctuationConst.colon
                +param;
    }

}
ServiceRegistry-服务注册类 接口
package com.github.houbb.rpc.server.registry;


public interface ServiceRegistry {

    
    ServiceRegistry port(final int port);

    
    ServiceRegistry register(final String serviceId, final Object serviceImpl);

    
    ServiceRegistry expose();

}
实现
package com.github.houbb.rpc.server.registry.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.rpc.common.config.protocol.ProtocolConfig;
import com.github.houbb.rpc.server.config.service.DefaultServiceConfig;
import com.github.houbb.rpc.server.config.service.ServiceConfig;
import com.github.houbb.rpc.server.core.RpcServer;
import com.github.houbb.rpc.server.registry.ServiceRegistry;
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;

import java.util.ArrayList;
import java.util.List;


public class DefaultServiceRegistry implements ServiceRegistry {

    
    private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry();

    
    private int rpcPort;

    
    private ProtocolConfig protocolConfig;

    
    private List serviceConfigList;

    private DefaultServiceRegistry(){
        // 初始化默认参数
        this.serviceConfigList = new ArrayList<>();
        this.rpcPort = 9527;
    }

    public static DefaultServiceRegistry getInstance() {
        return INSTANCE;
    }

    @Override
    public ServiceRegistry port(int port) {
        ArgUtil.positive(port, "port");

        this.rpcPort = port;
        return this;
    }

    
    @Override
    @SuppressWarnings("unchecked")
    public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) {
        ArgUtil.notEmpty(serviceId, "serviceId");
        ArgUtil.notNull(serviceImpl, "serviceImpl");

        // 构建对应的其他信息
        ServiceConfig serviceConfig = new DefaultServiceConfig();
        serviceConfig.id(serviceId).reference(serviceImpl);
        serviceConfigList.add(serviceConfig);

        return this;
    }

    @Override
    public ServiceRegistry expose() {
        // 注册所有服务信息
        DefaultServiceFactory.getInstance()
                .registerServices(serviceConfigList);

        // 暴露 netty server 信息
        new RpcServer(rpcPort).start();
        return this;
    }

}

ServiceConfig 是一些服务的配置信息,接口定义如下:

package com.github.houbb.rpc.server.config.service;


public interface ServiceConfig {

    
    String id();

    
    ServiceConfig id(String id);

    
    T reference();

    
    ServiceConfig reference(T reference);

}
测试 maven 引入

引入服务端的对应 maven 包:


    com.github.houbb
    rpc-server
    0.0.6

服务端启动
// 启动服务
DefaultServiceRegistry.getInstance()
        .register(ServiceIdConst.CALC, new CalculatorServiceImpl())
        .expose();

这里注册了一个计算服务,并且设置对应的实现。

和以前实现类似,此处不再赘述。

启动日志:

[DEBUG] [2021-10-05 13:39:42.638] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 13:39:42.645] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端
十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xec4dc74f] REGISTERED
十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xec4dc74f] BIND: 0.0.0.0/0.0.0.0:9527
十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xec4dc74f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
[INFO] [2021-10-05 13:39:43.893] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9527】端口

ps: 写到这里忽然发现忘记添加对应的 register 日志了,这里可以添加对应的 registerListener 拓展。

小结

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/314920.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号