栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

源码学习Dubbo的线程模型

源码学习Dubbo的线程模型

前言
    本文章从官网介绍的Dubbo线程模型结论出发,了解dubbo是怎样暴露netty服务,接收请求后怎样提交到业务线程池了解dubbo怎样提供Dispatcher的扩展以及ThreadPool的扩展怎样去修改dispatcher和threadpool的默认配置,线程数配置的思考以及出现问题后如何排查
Dubbo线程模型介绍
    如果事件处理的逻辑能迅速完成,并且不会发起新的IO请求,则直接在IO线程上处理更快但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,则必须派发到线程池
暴露服务

    监听到Spring启动事件ContextRefreshedEvent后,暴露服务

    遍历所有服务进行暴露

    2.1 暴露服务会同时暴露本地和远程服务(ServiceConfig#exportUrl)
    (1)本地服务会将url的host改为127.0.0.1,port改为0,protocol改为injvm
    (2)暴露远程服务时,以url的address维度进行暴露(创建服务 DubboProtocol#openServer)

    private void openServer(URL url) {
    	// 192.168.182.1:20880
    	String key = url.getAddress();
    	ProtocolServer server = serverMap.get(key);
    	if (server == null) {
           synchronized (this) {
               server = serverMap.get(key);
               if (server == null) {
                  serverMap.put(key, createServer(url));
               }else {
                  server.reset(url);
               }
            }
       }
    }
    

    2.2 通过初始化NettyServer,然后调用其doOpen开启Netty远程服务
    (1)NettyServer会初始化一个包装handler,用来处理用户请求事件

    // NettyServer构造方法 --> ChannelHandlers#wrap  --> ChannelHandlers#wrapInternal(例如HeartbeatHandler包装AllChannelHandler)
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultframeworkModel().getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
    

    (2)从url中获取dispather参数,默认为all。然后根据扩展参数在org.apache.dubbo.remoting.Dispatcher扩展文件中相应的扩展,通过Dispatcher的dispatch分发handler

    (3)NettyServer服务端会按照端口的维护根据url创建一个业务线程池

    // DefaultExecutorRepository
    public synchronized ExecutorService createExecutorIfAbsent(URL url) {
    	Integer portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE : url.getPort();
    	URL finalUrl = url;
    	// 从url中获取threadpool参数,默认为fixed --> 根据扩展名在org.apache.dubbo.common.threadpool.ThreadPool文件中找相应的扩展
        ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(finalUrl));
    }
    

处理用户请求
    NettyServerHandler继承netty的ChannelDuplexHandler,在处理netty事件时,转交给handler属性进行处理
    protected void doOpen() throws Throwable {
    	// 将NettyServer作为NettyServerHandler的handler属性进行封装
    	final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    	bootstrap.group(bossGroup, workerGroup)
    				   .childHandler(new ChannelInitializer() {
    				   		@Override
                    		protected void initChannel(SocketChannel ch) throws Exception {
                    			ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    		}
    				   }
    }
    
    direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行
    2.1 配置Dispatcher为direct

    2.2 DirectChannelHandler处理消息
    public void received(Channel channel, Object message) throws RemotingException {
    	// 获取线程池(一般不属于ThreadlessExecutor,所以不会交给业务线程池执行)
    	ExecutorService executor = getPreferredExecutorService(message);
        if (executor instanceof ThreadlessExecutor) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
    
    扩展业务线程池的线程数以及队列大小
    3.1 以FixedThreadPool为例,默认线程数为200,队列大小为0

    3.2 通过下面的配置可以扩展线程数和队列大小


    3.3 线程池不是配置越大越好,与系统层面以及配置JVM参数相关
    (1)JVM参数:-Xms 初识堆大小 -Xmx最大堆大小 -Xss 每个线程栈大小
    (2)系统最大可创建的线程
    (3)公式线程数量 = (机器本身可用内存 -(JVM分配的堆内存 + JVM元数据去))/Xss的值
扩展

    《记一次敖丙dubbo线程池事故排查》中提到线程池耗尽时伴随一次较久的ygc
    1.1 IO高为什么会导致GC时间长
    (1)JVM GC需要通过发起系统调用write(),来记录GC行为
    (2)write()调用可以被后台磁盘IO所阻塞
    (3)记录GC日志属于JVM停顿的一部分,因此write()调用时间会被计算在JVM STW的停顿时间内

    1.2 在告警时间段发现磁盘IO呈现不间断锯齿状
    (1)磁盘IO较高,导致cpu_iowait变高,cpu等IO,瞬间分配不到时间片,rt就会抖动

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728330.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号