栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

dubbo源码分析第十二篇一dubbo远程调用第二小节一消费端rpc层发送消息核心源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

dubbo源码分析第十二篇一dubbo远程调用第二小节一消费端rpc层发送消息核心源码解析

文章目录

流程图示源码分析

InvokerInvocationHandler.invokeMockClusterInvoker.invoke容错降级AbstractClusterInvoker.invoke 获取负载均衡实现以及服务端集合FailoverClusterInvoker.invoke 支持重试的clusterInvokerCallbackRegistrationInvoker监听过滤AsyncToSyncInvoker.invoke异步转同步dubboInvoker.doinvoke调用remote层远程通信 总结

流程图示

源码分析 InvokerInvocationHandler.invoke

代理入口,对方法的调用,代理拦截后调用invoker.invoke

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    ...... 删除equals hashcode等方法调用 
    调用MockClusterInvoker 构建调用会话
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
MockClusterInvoker.invoke容错降级

配置force则直接降级配置fail则失败降级无配置正常调用服务端

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    获取容错降级配置
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
        step-1: 无配置直接调用
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        step-2: 强制降级直接走降级逻辑 一般配置于dubbo面板
        result = doMockInvoke(invocation, null);
    } else {
       step-3: 容错直接调用 异常走降级逻辑
        try {
            result = this.invoker.invoke(invocation);
            区分rpc业务异常与远程调用异常
            if(result.getException() != null && result.getException() instanceof RpcException){
                RpcException rpcException= (RpcException)result.getException();
                if(rpcException.isBiz()){
                    throw  rpcException;
                }else {
                    result = doMockInvoke(invocation, rpcException);
                }
            }

        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}
AbstractClusterInvoker.invoke 获取负载均衡实现以及服务端集合

通过directory获取服务端所有注册的提供者通过url获取负载均衡实现执行doinvoke

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();
    添加上下文附件到远程会话对象上
    Map contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    通过directory获取服务端所有注册的提供者
    List> invokers = list(invocation);
    通过url获取负载均衡实现
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
FailoverClusterInvoker.invoke 支持重试的clusterInvoker

循环重试通过负载均衡选择Invoker执行Invoker调用

 public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
    List> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    
    RpcException le = null;
    List> invoked = new ArrayList>(copyInvokers.size()); // invoked invokers.
    Set providers = new HashSet(len);
    len表示重试次数
    for (int i = 0; i < len; i++) {
        ...... 删除部分代码
        负载均衡选择
        Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            ...... 删除部分代码
            throw e;
        } 
    }
    ...... 删除部分代码
}
CallbackRegistrationInvoker监听过滤

过滤器链执行 最后执行下一个InvokerListenableFilter能够对asyncResult进行监听[这里代码删除]

  @Override
        public Result invoke(Invocation invocation) throws RpcException {
            过滤器链执行 最后执行dubboInvoker
            Result asyncResult = filterInvoker.invoke(invocation);
            ...... 删除具有监听功能的过滤器
            return asyncResult;
        }
AsyncToSyncInvoker.invoke异步转同步

一般默认是同步模式通过future机制实现异步

public Result invoke(Invocation invocation) throws RpcException {
    asyncResult是一个future
    Result asyncResult = invoker.invoke(invocation);
    如果同步则无限等待获取结果
  
    if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
        asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
    }
    ...... 删除异常代码
    如果异步直接返回future
    return asyncResult;
}
dubboInvoker.doinvoke调用remote层远程通信

调用通信层进行远程通信返回future对象,上层Invoker通过get阻塞

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    接口名+端口号
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);
    获取ExchangeClient
    ...... 删除client的选择 一般默认就一个共享连接
    ...... 删除部分代码
    boolean isoneway = RpcUtils.isOneway(getUrl(), invocation);
    int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
    区分单边,比如心跳包等
    if (isOneway) {
        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
        currentClient.send(inv, isSent);
        return AsyncRpcResult.newDefaultAsyncResult(invocation);
    } else {
        核心
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
        进行请求
        CompletableFuture responseFuture = currentClient.request(inv, timeout);
        asyncRpcResult.subscribeTo(responseFuture);
        FutureContext.getContext().setCompatibleFuture(responseFuture);
        返回请求future对象,responseFuture并没有真正请求完毕,在异步转同步Invoker处阻塞
        return asyncRpcResult;
    }
   
}
 
总结 

除去FailoverClusterInvoker还有多种容错策略整个容错核心在于 ***ClusterInvoker整个负载均衡核心在于loadbalance.selectdirectory.list出了列举服务提供者集合,还处理了router路由过滤[tag,script,condition等]

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号