栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty4.1源码分析—— 服务端构建连接

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty4.1源码分析—— 服务端构建连接

文章目录
  • 一、引言与结论
  • 二、构建连接过程
    • 2.1 创建SocketChannel
    • 2.2 DefaultMaxMessagesRecvByteBufAllocator类
      • 2.2.1 incMessagesRead方法
      • 2.2.2 continueReading方法
      • 2.2.3 lastBytesRead方法
      • 2.2.4 attemptedBytesRead方法
    • 2.3 如何初始化SocketChannel?
  • 三、总结

本文只代表笔者一人的理解和叙述,笔者功力尚浅,如有错误,还请各位大神斧正。
阅读本篇文章前需阅读:
Netty4.1源码分析—— 服务端启动

一、引言与结论

从Netty4.1源码分析—— 服务端启动分析得出,启动过程的实质是创建ServerSocketChannel并将该channel注册上OP_ACCEPT事件,说到底就是为即将到来的socket连接做好准备。

不同的socketChannel承担着不同的责任,我们在Netty中常说的连接则是指的是服务端和客户端的SocketChannel,客户端和服务端的SocketChannel是一对一的关系,所以ServerSocketChannel所做的准备实质上是为了创建和初始化SocketChannel所做的准备。

当我们分析某项事物所做的一系列动作的时候,不妨先来了解下它的最终目的。客户端和服务端构建完连接后,下一步的目的是开始接收/发送数据了,所以服务端构建连接其实也是在为下一步做准备——为接收数据做准备,也就是创建SocketChannel并将其注册OP_READ事件。

二、构建连接过程

一个新的连接建立会触发ServerSocketChannel上的OP_ACCEPT事件,也就是EventLoop中对OP_ACCEPT和OP_READ事件的处理:

// 处理OP_READ or OP_ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

如果对EventLoop事件循环处理不熟悉的同学可以参照Netty4.1源码分析—— 服务端启动一文。这里unsafe的实例指的是AbstractNioMessageChannel类里的内部类NioMessageUnfafe类,因为其read方法比较长,所以我们分步骤来分析。

2.1 创建SocketChannel

在read方法中其doReadMessages方法是用来创建SocketChannel的,其源码如下:

protected int doReadMessages(List buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        //...
    }

    return 0;
}
 

从代码中看出,该方法接收一个buf数组,返回1代表创建成功,返回0代表失败。buf数组在这一步是用来存放已创建好的SocketChannel的,buf数组在后续广播read事件时会用到。而创建SocketChannel调用的是SocketUtils的accept方法:

public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
    try {
        return AccessController.doPrivileged(new PrivilegedExceptionAction() {
            @Override
            public SocketChannel run() throws IOException {
                return serverSocketChannel.accept();
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

可以看出,创建SocketChannel是ServerSocketChannel的accept方法,其内部调用的是NIO的NATIVE方法来构建SocketChannel实例。

2.2 DefaultMaxMessagesRecvByteBufAllocator类

在read方法中,allocHandle实例指的是DefaultMaxMessagesRecvByteBufAllocator类的内部类MaxMessageHandle类。从名字不难看出,该Allocator类是为了处理读取(包含接受连接)事件时的一个处理类,在一次读取事件时,可能存在读取数据较少,接收缓冲区还能读取更多的情况。DefaultMaxMessagesRecvByteBufAllocator类就是为了处理这些情况而诞生的,其默认一次读取尽可能的读取满16次。首先看其几个重要方法:

2.2.1 incMessagesRead方法
public final void incMessagesRead(int amt) {
    totalMessages += amt;
}

该方法是为了记录总共的读取次数totalMessages变量。

2.2.2 continueReading方法
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
    @Override
    public boolean get() {
        return attemptedBytesRead == lastBytesRead;
    }
};

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return config.isAutoRead() &&
           (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
           totalMessages < maxMessagePerRead &&
           totalBytesRead > 0;
}

continueReading方法是确认是否此次READ或者ACCEPT事件还能继续读取的判断。其判断条件有四个:

  • 配置里自动读取打开。
  • 尝试读取的字节和最后读取的字节数相等。
  • 读取的总次数小于最大读取次数(16次)。
  • 读取的总字节数大于0。

当上述条件全部满足时,才会进行下一次的读取。而在处理ACCEPT事件时,因为总读取的字节数始终为0,所以在此处只会进行doReadMessages一次。

2.2.3 lastBytesRead方法
lastBytesRead = bytes;
if (bytes > 0) {
    totalBytesRead += bytes;
}

该方法是记录上一次读取的字节数,该方法在读取数据时会用到,在构建连接时没有用到。

2.2.4 attemptedBytesRead方法
public void attemptedBytesRead(int bytes) {
    attemptedBytesRead = bytes;
}

该方法是记录尝试读取的字节数目attemptedBytesRead变量,同样也只在读取数据时会用到。了解完上述知识,再看read方法前半段就轻而易举了:

private final List readBuf = new ArrayList();

@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    //得到ServerSocketChannel的pipline
    final ChannelPipeline pipeline = pipeline();
    // 新建一个MaxMessageHandle类
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                // 生成一个SocketChannel
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                // 记录读取的次数
                allocHandle.incMessagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }
    } finally {
        //.....
    }
}
 
2.3 如何初始化SocketChannel? 

在Netty4.1源码分析—— 服务端启动一文中,可以了解到ServerSocketChannel所绑定的handler类中有一个ServerBootstrapAcceptor类专门来初始化SocketChannel,那么怎么将SocketChannel传递到ServerBootstrapAcceptor类中呢?这就要分析一下read方法的后半段了:

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
    readPending = false;
    // 在这一步将SocketChannel传播出去,让Acceptor接收到进行初始化
    pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

在此处,通过ServerSocketChannel所绑定的pipline通过广播channelRead事件,将SocketChannel广播出去,此时的pipline结构如图:


也就是在此处会将SocketChannel通过channelRead事件广播到ServerBootstrapAcceptor类中的channelRead方法里:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        // next 一个EventLoop绑定
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

关于EventLoop和SocketChannel如何绑定的可以阅读Netty4.1源码分析—— 服务端启动一文中的3.1.4章节,其内容和ServerSocketChannel绑定过程一致。区别的是,虽然两者都是最终调用了io.netty.channel.AbstractChannel.AbstractUnsafe类的register0方法,也是先利用doRegister方法只注册上该channel感兴趣的事件集为0。区别在于ServerSocketChannel是利用doBin0方法来完成注册OP_ACCEPT事件的,而此时SocketChannel在该方法执行时就已经被激活了,所以直接走的pipline.channelActive事件来完成注册OP_READ事件:

private void register0(ChannelPromise promise) {
    try {
        //...
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        
        pipeline.invokeHandlerAddedIfNeeded();
        // 设置promise success, 注册ServerSocketChannel那么这里就是调用doBind0
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // 注册ServerSocketChannel在这步没有激活,不会往下走
        // 注册socketChannel时已被激活,所以继续执行
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } 
    //....
}

而在最后注册OP_READ事件时,也是和ServerSocketChannel一致,最终调用的是AbstractNioChannel的deBeginRead方法,只不过此处的interestOps值为1,即代表着OP_READ事件:

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // OP_ACCEPT = 1 << 4 = 16/OP_READ = 1 <<0 = 1
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
三、总结
  1. 利用Selector的select方法监听到OP_ACCEPT事件,调用AbstractNioMessageChannel类里的内部类NioMessageUnfafe类的read方法来处理该OP_ACCEPT事件。
  2. 采用ServerSocketChannel的accept方法创建一个SocketChannel。
  3. 通过ServerSocketChannel的pipline广播channelRead事件将SocketChannel传递至ServerBootstrapAcceptor类中channelRead方法中进行初始化和EventLoop绑定。
  4. 初始化的时候调用NIO的register方法将SocketChannel注册进Selector,注意此处设置SocketChannel的感兴趣事件集为0。
  5. 最终注册OP_READ事件集是通过pipline广播channelActive消息来完成的,OP_READ事件代表值为1。
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号