栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty核心组件之ChannelPipeline源码分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty核心组件之ChannelPipeline源码分析

2021SC@SDUSC

因为经过了第一个部分的分析之后,发现组内同学的代码分析是有重复的地方。因此我又重新梳理了自己负责的分析内容 :

1. 核心组件

  •      EventLoop 和 EventLoopGroup(已分析)
  •      ChannelPipeline(本节分析)
  •      ChannelHandler 
  •      ChannelHandlerContext

2. ChannelHandler责任链模式的过滤链

3. 通信协议和私有协议栈开发


我们这次的主要进行ChannelPipeline的源码分析。

在通过前面的分析之后,我想如果把Netty比喻成一个人,那么 EventLoop 就是这个人的大脑,负责这个人的所有操作。而我们后面分析的组件更像是“器官”的功能,进行更具体的工作。今天我们就来进行ChannelPipeline的源码分析。

目录

一、ChannelPipeline、ChannelHandler、ChannelHandlerContext 关系分析

二、ChannelPipeline

 1. ChannelPipeline创建时机

          2. ChannelPipeline初始化

          3.ChannelPipeline工作原理分析

   (1)工作流程简介

   (2)ChannelHandler添加

   (3)ChannelHandler添加步骤详解——addLast方法详解

               3.1  包裹 并 加入链表

               3.2  注册完成回调事件

               3.3  回调添加完成事件

  (5)ChannelPipeline的出入站

 

           三、小结


一、ChannelPipeline、ChannelHandler、ChannelHandlerContext 关系分析

在分析之前,我们先来理清这三个组件之间的关系:

目前为止,我们分析了Netty的EventLoop的实现,以及一些关于EventLoopGroup的内容。知道了一个Channel是怎么被分配给一个EventLoop来支持读写事件的,以及一个Channel的事件循环是如何运转起来的。

现在来分析,服务端accept一个新的Channel之后,需要监听这个Channel上的事件,并且将所发生的事件投递到正确的handler上来处理,Netty使用ChannelHandler组件来处理Channel上发生的事件,处理完成之后再将结果发送出去,而ChannelPipeline就是将多个ChannelHandler组合在一起,形成一个链条,这个链条会拦截Channel上的事件,然后在链条中传播。

ChannelHandler Context是一个特别重要的组件,首先,每一个新创建的Channel都会分配给一个ChannelPipeline,而ChannelHandlerContext将ChannelHandler和ChannelPipeline联系起来,ChannelHandlerContext提供了和ChannelPipeline类似的方法,但是调用ChannelHandlerContext上的方法只会从当前ChannelHandler开始传播,并且只会传播到下一个ChannelHandler上,而调用ChannelPipeline上的方法会沿着链条一直传递下去。

ChannelHandler分为ChannelInboundHandler和ChannelOutboundHandler,分别代表入站处理器和出站处理器,在实际应用中,我们没必要直接实现ChannelInboundHandler接口或者ChannelOutboundHandler接口,下面的图片展示了Netty为我们提供的一些实现子类:

对于入站处理,我们只需要继承ChannelInboundHandlerAdapter就可以了,然后重写我们需要的方法,对于出站处理来说,只需要继承ChannelOutboundHandlerAdapter子类,然后重写需要的接口就可以了。如果我们不再希望将事件传递下去,你应该在处理完消息之后释放它,否则这个消息会一直传递下去直到最后一个处理器。

关系如下图所示:

上面是我通过查阅资料了解的大体关系,下面从源码来逐步分析是如何实现上述功能的。

二、ChannelPipeline

ChannelPipeline将多个ChannelHandler链接在一起来让事件在其中传播处理。一个ChannelPipeline中可能不仅有入站处理器,还有出站处理器,入站处理器只会处理入站的事件,而出站处理器只会处理出站的数据,下面展示了一个同时具有入站处理器和出站处理器的ChannelPipeline:

现在来分析一下netty如何实现ChannelPipeline。

1. ChannelPipeline创建时机

首先还是从示例demo出发,观察ChannelPipeline是何时被创建的。

示例来自netty提供的测试程序,这里只保留了程序的关键部分:

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(8);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)//new ReflectiveChannelFactory(channelClass)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))// ServerSocketChannel 专属
             .childHandler(new ChannelInitializer() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception { // SocketChannel 专属
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoServerHandler());
                 }
             });
           // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
 
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

 这里注意childHandler这个方法:

.childHandler(new ChannelInitializer() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception { // SocketChannel 专属
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoServerHandler());
                 }
             });

这个方法里面传递了一个ChannelInitializer类型的对象,我们来看一下ChannelInitializer的继承结构,继承了ChannelInboundHandlerAdapter类 , 如下图所示:

ChannelInitializer继承了ChannelInboundHandlerAdapter类 , 所以它是一个入站处理器,这里重写了initChannel方法,当这个参数调用它的initChannel方法的时候就会初始化ChannelPipeline,那么什么时候会调用initChannel这个方法呢?

我们进入ChannelInitializer源码中查看:

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.
            ctx.pipeline().fireChannelRegistered();

            // We are done with init the Channel, removing all the state for the Channel now.
            removeState(ctx);
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.
            ctx.fireChannelRegistered();
        }
    }

在调用channelRegistered方法时会调用initChannel方法,由继承层次依次查看,可以发现这个调用过程(自下而上)如下:

   -> ChannelInitializer.initChannel
   -> ChannelInitializer.channelRegistered
   -> AbstractChannelHandlerContext.invokeChannelRegistered
   -> AbstractChannelHandlerContext.fireChannelRegistered
   -> AbstractChannel.AbstractUnsafe.register0
   -> AbstractChannel.AbstractUnsafe.register
   -> AbstractBootstrap.initAndRegister
   -> AbstractBootstrap.doBind
   -> AbstractBootstrap.bind

ChannelPipeline的创建是在AbstractChannel类的构造函数中完成的:

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();   //开始创建Pipeline对象
}

在其子类NioServerSocketChannel实例化的时候进行创建。

2. ChannelPipeline初始化

在Netty中每个Channel都有且仅有一个ChannelPipeline 与之对应,它们的组成关系如下:

通过上图我们可以看到,一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext中又关联着一个ChannelHandler。

下面的代码是AbstractChannel构造器:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

AbstractChannel有一个pipeline字段,在构造器中会初始化它为DefaultChannelPipeline的实例,这里的代码就印证了一点:每个Channel都有一个ChannelPipeline。
接着我们跟踪一下 DefaultChannelPipeline的初始化过程:

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
 
        tail = new TailContext(this);
        head = new HeadContext(this);
 
        head.next = tail;
        tail.prev = head;
}

在DefaultChannelPipeline构造器中,首先将与之关联的Channel保存到字段channel中,然后实例化两个ChannelHandlerContext,一个是HeadContext实例head,一个是TailContext实例tail。接着将head和tail互相指向,构成一个双向链表。

其中tail和head的类分别是TailContext和HeadContext,我们来看一下这两个类的继承结构:

从上图可以看出,TailCotext这个类只实现了ChannelInboundHandler这个接口,说明这个类仅仅是一个Inbound事件处理器。

从上图中可以看出,HeadContext这个类不但实现了ChannelOutboundHadler接口,并且也实现了ChannenlInboundHandler接口,表明这个类可以同时处理inbound以及outbound事件。但是在具体的实现逻辑中,这个类通常被拿来处理outbound事件。

3.ChannelPipeline工作原理分析

(1)工作流程简介

 在ChannelPipeline源码阅读中,我看到了这个图。

通过查阅资料,我们从上图可以看出 消息读取和发送处理全流程为:

底层的 SocketChannel.read()方法 读取 ByteBuf,触发 ChannelRead事件,由 IO线程 NioEventLoop 调用 ChannelPipeline 的 fireChannelRead(Object msg)方法,将消息传输到 ChannelPipeline 中。


消息依次被 HeadHandler、ChannelHandler1、ChannelHandler2 … TailHandler 拦截和处理,在这个过程中,任何 ChannelHandler 都可以中断当前的流程,结束消息的传递。


调用 ChannelHandlerContext 的 write方法 发送消息,消息从 TailHandler 开始途经 ChannelHandlerN … ChannelHandler1、HeadHandler,最终被添加到消息发送缓冲区中等待刷新和发送,在此过程中也可以中断消息的传递,例如当编码失败时,就需要中断流程,构造异常的Future返回。

(2)ChannelHandler添加

从上面不难看出,ChannelPipeline处理的核心是一连串的ChannelHandler。那么ChannelHandler是在哪里添加上去的?

最开始的时候ChannelPipeline中含有两个ChannelHandlerContext(head和tail),但是这个Pipeline并不能实现什么特殊的功能,因为我们还没有给它添加自定义的ChannelHandler。

这里就要考虑客户端了。首先在用户使用 Netty 时,用户不需要自己创建 pipeline,因为从第一部分的创建时间的分析中可以知道,在使用 ServerBootstrap 或者 Bootstrap 进行配置后,Netty 会为每个 Channel连接 创建一个独立的pipeline。即我们在第二部分所讨论的。这些初始化的工作是由服务端来实现的。对于客户端来说,用户只需将自定义的 ChannelHandler 加入到 pipeline 即可。通常来说,用户在初始化Bootstrap时,会添加自定义的ChannelHandler。由于 ChannelHandler 中的事件种类繁多,不同的 ChannelHandler 可能只需要关心其中的个别事件,所以,自定义的ChannelHandler 只需要继承 ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter,覆盖自己关心的方法即可。

我们在这里用EchoClient类来看一下 ChannelHandler的添加过程:

// Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

可以看出在调用handler时,传入了ChannelInitializer对象,它提供了一个initChannel方法供我们初始化ChannelHandler,这个部分和我们前面分析的服务端相同,不过这里是从客户端的角度来看的。这里handler方法是什么时候被调用的呢?

通过前面对服务端的调用流程进行了分析,这里我们可以比较熟练的找到,它是在Bootstrap.init()中添加到ChannelPipeline中的。

 @Override
    void init(Channel channel) {
        ChannelPipeline p = channel.pipeline();
        p.addLast(config.handler());

        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());
    }

这里代码将handler()返回的ChannelHandler,其实就是我们在初始化Bootstrap调用handler设置的ChannelInitializer实例,因此这里就是将ChannelInitializer插入到了Pipeline的末端。

(3)ChannelHandler添加步骤详解——addLast方法详解

先附上完整addLast方法代码如下:

   @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

3.1  包裹 并 加入链表

刚刚讨论是将是将ChannelInitializer插入到了Pipeline的末端,但是ChannelPipeline上插入的应该是ChannelHandlerContext(包含ChannelHandler)才对,为什么这里插入的是ChannelInitializer实例?这里我们就要来看一下这里使用的addLast方法了,是如何把handler包装成ChannelHandlerContext的。

这里主要关注这三行:

            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

这个addLast方法中,首先检查这个ChannelHandler的名字是否是重复的,如果不重复的话,则为这个handler创建一个对应的DefaultChannelHandlerContext实例,通过this.handler = handler与之关联起来(Context中有一个handler属性保存着对应的Handler实例)。

DefaultChannelHandlerContext源码如下:

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
    super(pipeline, group, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

当创建好ChannelHandlerContext后,就将这个ChannelHandlerContext 插入到ChannelPipeline的双向链表中:

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}

这里就是真正向ChannelPipeline中插入的方法了。中间的四步是很明显的向双向链表中插入的操作。将新插入节点的prev指向前一个节点,后一个节点指向tail,前一个节点的next指向新插入节点,tail的prev指向新插入节点。完成个插入操作。

此时Pipeline的结构如下图所示:

 分析到这里,可以看出我们向ChannelPipeline中插入的的确是ChannelHandlerContext。

3.2  注册完成回调事件

这一部分关注如下代码:

 // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }

这里的注释还是很清楚的。

如果 channel 尚未注册到 EventLoop,就添加一个任务到 PendingHandlerCallback 上,后续channel 注册完毕,再调用 ChannelHandler.handlerAdded。

3.3  回调添加完成事件

关注代码如下:

callHandlerAdded0(newCtx);
 private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.callHandlerAdded();
        } catch (Throwable t) {
            ...
            }
         ...
    }

如果已经注册,马上调用 callHandlerAdded0 方法来执行 ChannelHandler.handlerAdded 注册完成的回调函数。

到这里就比较完整的分析完一个handler添加的全过程了。

(5)ChannelPipeline的出入站

除了关联handler之后,这里还有两个方法也和handler有关,我们一起来看一下:

private static boolean isInbound(ChannelHandler handler) {
    return handler instance ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instance ChannelOutboundHandler;
}

从源码中可以看出,当一个handler实现了ChannelInboundHandler接口,则isInbound返回。当一个handler实现了ChannelOutboundHandler接口,则isOutbound就返回真。而这两个boolean变量会传递到父类AbstractChannelHandlerContext中,初始化父类的两个字段:inbound和outbound。

在第一部分中,我们就查看了ChannelInitializer的代码,仅仅实现了ChannelInboundHandler接口,因此这里实例化的DefaultChannelHandlerContext的inbound是true,outbound是false。所以这里的两个变量是什么意思呢?

从我最开始贴出的在ChannelPipelin源码中贴出的图可以看出Netty的事件可以分为Inbound和outbound事件。

从上图可以看到:

  • inbound事件和outbound事件的流向是不一样的,inbound事件的流向是从下至上,而outbound刚好相反,是自下而上。
  • inbound的传递方式是通过调用相应ChannelHandlerCountext.fireIN_EVT()方法,而outbound方法的传递方式是通过调用ChannelHandlerContext .OUT_EVT()方法。

入站:当查询到Java NIO底层Channel的就绪事件时,通过一系列的ChannelInboundHandler处理器,完成底层就绪事件的处理。比方说底层连接建立事件、底层连接断开事件、从底层读写就绪事件等等。入站处理通常由底层Java NIO channel触发。

出站:当需要Netty Channel需要操作Java NIO底层Channel时,通过一系列的ChannelOutboundHandler处理器,完成底层操作。比方说建立底层连接、断开底层连接、从底层Java NIO通道读入、写入底层Java NIO通道等。

这个从我们的角度来理解就是数据有入站和出站两种类型。这里的出站和入站,不是网络通信方面的入站和出站。而是相对于Netty Channel与Java NIO Channel而言的。

数据入站,指的是数据从底层的Java NIO channel到Netty的Channel。

数据出站,指的是通过Netty的Channel来操作底层的 Java NIO chanel。

从入站和出战的角度出发,Netty中的ChannelHandler主要由两种类型,ChannelInboundHandler和ChannelOutboundHandler。
 

所以我们刚刚讨论的 inbound,outbound两个字段的含义如下:

  • inbound为真时,表示对应的ChannelHandler实现了ChannelInboundHandler
  • outbound为真时,表示对应的Channelhandler实现了ChannelOutboundHandler

inbound事件传播方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

Outbound事件传输方法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)

这里其他具体的代码分析,由我们组另外一位同学进行。

三、小结

这一部分的内容确实写的非常多,为了加快速度,将很多内容都在这一节博客中讲完。现在进行一个梳理:

1. 从应用程序开发人员的角度来看,Netty的主要组件是ChannelHandler,所以,对ChannelHandler的分类,也是从应用开发的角度来的。

数据有入站和出站两种类型。Netty中的ChannelHandler主要有两种类型,ChannelInboundHandler和ChannelOutboundHandler。产生一的一系列事件将由ChannelHandler所对应的API处理。查询到Java NIO底层Channel的就绪事件时,通过一系列的ChannelInboundHandler处理器,完成底层就绪事件的处理。当需要Netty Channel需要操作Java NIO底层Channel时,通过一系列的ChannelOutboundHandler处理器。

2. 一个Channel在数量上,肯定不止拥有一个Handler。 如何将杂乱无章的Handler,有序的组织起来呢?

需要一个Handler的装配器——Pipeline。

Netty中, 使用一个双向链表,将属于一个Channel的所有Handler组织起来,并且给这个双向链表封装在一个类中,这个类就是ChannelPipeline。

实际上这里用了Java中一种非常重要的设计模式,Pipeline设计模式。

在Netty的设计中,Handler是无状态的,不保存和Channel有关的信息。Handler的目标,是将自己的处理逻辑做得很完成,可以给不同的Channel使用。

与之不同的是,Pipeline是有状态的,保存了Handler和Channel的关系。

因此,Handler和Pipeline之间,需要一个中间角色,把他们联系起来。这个中间角色就是——ChannelHandlerContext 。

所以,ChannelPipeline 中维护的,是一个由 ChannelHandlerContext 组成的双向链表。这个链表的头是 HeadContext, 链表的尾是 TailContext。而无状态的Handler,作为Context的成员,关联在ChannelHandlerContext 中。在对应关系上,每个 ChannelHandlerContext 中仅仅关联着一个 ChannelHandler。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/439899.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号