栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Dubbo消费端线程池模型源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Dubbo消费端线程池模型源码分析

前言

    在Dubbo官方文档《消费端线程池模型》中提到2.7.5版本改进了消费端线程池模型,通过复用业务端被阻塞的线程,解决消费端线程数分配过多的问题

    2.7.5版本引入的线程池模型

    2.1 业务线程发出请求,拿到一个Future实例
    2.2 在调用future.get()之前,先调用ThreadlessExecutor.wait(),wait会使业务线程在一个阻塞队列上等待,直到队列中被加入元素

    下面以dubbo3.0.5的版本进行源码调试验证

Dubbo客户端远程调用

    入口在InvokerInvocationHandler#invoke(只展示主要步骤)
    1.1 invoker.invoke(rpcInvocation) 发起远程调用(详细见下面分析)
    1.2 recreate() 处理响应(AsyncRpcResult#recreate)
    (1)调用future.get()获取响应信息AppResponse
    (2)返回AppResponse的result

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    	return invoker.invoke(rpcInvocation).recreate();
    }
    

    发起远程调用入口在AbstractInvoker#invoke

    public Result invoke(Invocation inv) throws RpcException {
    	// do invoke rpc invocation and return async result
        AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
        // wait rpc result if sync  --> ThreadlessExecutor#waitAndDrain
        waitForResultIfSync(asyncResult, invocation);
    }
    

    2.1 发起rpc请求并返回异步结果(DubboInvoker#doInvoke)
    (1)通过channel发送Request请求
    (2)将channel、Request请求以及ThreadlessExecutor封装为DefaultFuture进行返回(详细见获取响应分析)
    (3)thenApply异步将结果将转换为AppResponse

    protected Result doInvoke(final Invocation invocation) throws Throwable {
    	// 返回new ThreadlessExecutor(sharedExecutor)
    	ExecutorService executor = getCallbackExecutor(getUrl(), inv);
    	// 通过netty的channel发送Request请求(HeaderExchangeChannel#request)
    	CompletableFuture appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
    }
    

    2.2 如果是同步调用,则等待rpc返回
    (1)取ThreadlessExecutor阻塞队列queue的任务,没有则阻塞(take会阻塞)
    (2)执行任务(会循环将所有任务执行完,poll取任务,没有任务返回null -> ThreadlessExecutor$RunnableWrapper#run -> ChannelEventRunnable#run)
    (3)通过DecodeHandler#received进行反序列
    (4)DefaultFuture#received调用CompletableFuture的complete方法将反序列化的结果安全写入result属性中

    // ThreadlessExecutor#waitAndDrain
    public void waitAndDrain() throws InterruptedException {
    	 Runnable runnable;
          try {
          	  // 当阻塞队列里没任务时,take方法会阻塞
               runnable = queue.take();
           }catch (InterruptedException e){
               waiting = false;
               throw e;
           }
    
    		synchronized (lock) {
                waiting = false;
                runnable.run();
        	}
    	
    	// 将阻塞队列里的所有任务都执行完,达到复用的作用
        runnable = queue.poll();
        while (runnable != null) {
            runnable.run();
            runnable = queue.poll();
        }
    }
    
获取响应
    DefaultFuture#newFuture将请求ID与channel以及DefaultFuture建立绑定关系
    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
    	// 将请求ID与channel 以及 ID与DefaultFuture 建立绑定关系
      	final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // ThreadlessExecutor needs to hold the waiting future in case of circuit return.
        if (executor instanceof ThreadlessExecutor) {
            ((ThreadlessExecutor) executor).setWaitingFuture(future);
        }
        // 启动超时定时任务TimeoutCheckTask
        timeoutCheck(future);
        return future;
    }
    
    获取响应入口:NettyClientHandler#channelRead
    2.1 收到响应后,最终触发AllChannelHandler#received将返回消息封装成ChannelEventRunnable任务放入阻塞队列queue中
    public void received(Channel channel, Object message) throws RemotingException {
    	// 如果DefaultFuture.getFuture(response.getId())不为空,则获取其Executor(ThreadlessExecutor)
    	ExecutorService executor = getPreferredExecutorService(message);
    	// ThreadlessExecutor#execute 将任务添加到阻塞队列queue中
    	executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    }
    
总结
    通过上述分析,Dubbo通过将响应封装ChannelEventRunnable任务放入阻塞队列,在获取响应时将执行阻塞队列里的所有任务,解决消费端线程数分配过多的问题主要涉及到知识点:netty异步通信以及CompletableFuture异步处理
    2.1 通过netty的channel发送消息
    2.2 将请求ID与channel以及异步结果CompletableFuture绑定
    2.3 netty接收到返回时,根据响应ID找到对应CompletableFuture,将结果写入到CompletableFuture中
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/731519.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号