栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty核心组件之EventLoop和EventLoopGroup源码分析(一)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty核心组件之EventLoop和EventLoopGroup源码分析(一)

2021SC@SDUSC

在这次博客中,着重介绍netty最为核心的组件EventLoop和EventLoopGroup。

目录

一. 引入 示例

二. 分析

1.NioEventLoop与NioEventLoopGroup的关系

2.NioEventLoop的构造函数

3.NioEventLoopGroup中的newChild函数


一.  引入示例

netty 提供了大量的 demo 供用户使用和测试,我们今天就从下图netty提供的EchoServer测试类出发,分析EventLoop和EventLoopGroup这两个核心组件。

类代码:

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    
    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(8);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)//new ReflectiveChannelFactory(channelClass)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))// ServerSocketChannel 专属
             .childHandler(new ChannelInitializer() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception { // SocketChannel 专属
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoServerHandler());
                 }
             });
           // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

二. 分析

在main方法中,首先创建了关于SSL 的配置类,之后创建了两个EventLoopGroup 对象:

      // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(8);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);

这两个对象是整个 Netty 的核心对象,可以说,整个 Netty 的运作都依赖于他们。bossGroup 用于接受 Tcp 请求,他会将请求交给 workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读写解码编码等操作。

1.NioEventLoop与NioEventLoopGroup的关系

EventLoopGroup构造:

首先EventLoopGroup声明如下:

      // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(8);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);

我们进入 NioEventLoopGroup类中查看:

NioEventLoopGroup的继承结构:

 

NioEventLoopGroup中包含的方法:

2.NioEventLoop的构造函数
    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    
    public NioEventLoopGroup(ThreadFactory threadFactory) {
        this(0, threadFactory, SelectorProvider.provider());
    }

参数最全的构造函数:

   public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             SelectorProvider selectorProvider,
                             SelectStrategyFactory selectStrategyFactory,
                             RejectedExecutionHandler rejectedExecutionHandler,
                             EventLoopTaskQueueFactory taskQueueFactory,
                             EventLoopTaskQueueFactory tailTaskQueueFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

NioEventLoop构造函数非常多,每个参数都可以定制,我就不全贴出来了,最后回到这个参数最全的构造函数,下面我们挨个解释每个参数的作用:

  • nThreads: 线程数,对应EventLoop的数量,为0时 默认数量为CPU核心数*2
  • executor: 这个我们再熟悉不过,最终用来执行EventLoop的线程
  • chooserFactor: 当我们提交一个任务到线程池,chooserFactor会根据策略选择一个线程来执行
  • selectorProvider:用来实例化jdk中的selector,没一个EventLoop都有一个selector
  • selectStrategyFactory:用来生成后续线程运行时对应的选择策略工厂
  • rejectedExecutionHandler:跟jdk中线程池中的作用一样,用于处理线程池没有多余线程的情况,默认直接抛出异常
 

上面的这些都是一些重载的构造方法,并加入了一些默认值,比如为null 的 executor,也有一个单例的选择策略工厂,还有一个默认的线程池拒绝策略等。下面才是 NioEventLoopGroup 真正的构造方法,在抽象父类MultithreadEventExecutorGroup中,代码如下 :

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 创建nThreads大小的EventLoop数组
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 创建具体的EventLoop,会调用子类NioEventLoopGruop中的方法
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 如果其中有一个创建失败,把之前创建好的都关闭掉
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // 把刚才创建好的EventLoop提供给EventExecutorChooser,用于后续选择
    chooser = chooserFactory.newChooser(children);

    // 添加一个EventLoop监听器,用来监听EventLoop终止状态
    final FutureListener terminationListener = new FutureListener() {
        @Override
        public void operationComplete(Future future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        // 循环加入
        e.terminationFuture().addListener(terminationListener);
    }
    // 将EventLoop数组转成一个只读的set
    Set childrenSet = new linkedHashSet(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
} 

分析:

     这部分代码很长,我们拆分来看:

1.如果 executor 是null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

2.根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。

children = new EventExecutor[nThreads];

3.循环填充数组中的元素。如果异常,则关闭所有的单例线程池。

4.根据线程选择工厂创建一个 线程选择器,默认是对2取余(位运算),也可以顺序获取。

 // 把刚才创建好的EventLoop提供给EventExecutorChooser,用于后续选择
    chooser = chooserFactory.newChooser(children);

5.为每一个单例线程池添加一个关闭监听器。

 // 添加一个EventLoop监听器,用来监听EventLoop终止状态
    final FutureListener terminationListener = new FutureListener() {
        @Override
        public void operationComplete(Future future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    }; 

6.将所有的单例线程池添加到一个 HashSet 中。

// 将EventLoop数组转成一个只读的set
    Set childrenSet = new linkedHashSet(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);

3.NioEventLoopGroup中的newChild方法

  当一个新的Channel连接时,NioEventLoopGroup需要拿出一个NioEventLoop让Channel绑定,这个Channel之后的IO操作都在这个NioEventLoop上操作。这里就调用了newChild方法,代码如下:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
  • 可以发现就是new NioEventLoop
  • 可以看出先用来创建EventLoopGroup的参数其实都是用来创建EventLoop的

跟进NioEventLoop的构造函数

分析注释在代码中

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
     	//调用父类的构造方法,主要是将保存 线程组 NioEventLoopGroup
    	//创建一个21亿的任务队列
    	//executor负责创建线程执行器也保存起来。
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        
    
        //provider=SelectorProvider.provider() jdk自带的用来创建ServerSocketChannel
        provider = selectorProvider;
    	//openSelector方法负责将jdk自带的hashSet结构的selectedKeys 也封装成netty自己定义的SelectedSelectionKeySet里面是一个数组结构,这个优化是可以配置的。
        final SelectorTuple selectorTuple = openSelector();
        //替换了数据结构selectedKeys   publicSelectedKeys的原生selector
        selector = selectorTuple.selector;
        //子类包装的selector  底层数据结构也是被替换了的
        unwrappedSelector = selectorTuple.unwrappedSelector;
        //selectStrategy=new DefaultSelectStrategyFactory()
        selectStrategy = strategy;
    }

  newChild的大致流程:

  • new NioEventLoop
    • 保存前面创建的ThreadPerTaskExcutor
    • 创建MpscQueue(任务队列)
    • 创建selector

 由于篇幅原因,关于NioEventLoop部分的分析,我们在下一篇博客中进行。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号