- 一、引言与结论
- 二、构建连接过程
- 2.1 创建SocketChannel
- 2.2 DefaultMaxMessagesRecvByteBufAllocator类
- 2.2.1 incMessagesRead方法
- 2.2.2 continueReading方法
- 2.2.3 lastBytesRead方法
- 2.2.4 attemptedBytesRead方法
- 2.3 如何初始化SocketChannel?
- 三、总结
一、引言与结论本文只代表笔者一人的理解和叙述,笔者功力尚浅,如有错误,还请各位大神斧正。
阅读本篇文章前需阅读:
Netty4.1源码分析—— 服务端启动
从Netty4.1源码分析—— 服务端启动分析得出,启动过程的实质是创建ServerSocketChannel并将该channel注册上OP_ACCEPT事件,说到底就是为即将到来的socket连接做好准备。
不同的socketChannel承担着不同的责任,我们在Netty中常说的连接则是指的是服务端和客户端的SocketChannel,客户端和服务端的SocketChannel是一对一的关系,所以ServerSocketChannel所做的准备实质上是为了创建和初始化SocketChannel所做的准备。
当我们分析某项事物所做的一系列动作的时候,不妨先来了解下它的最终目的。客户端和服务端构建完连接后,下一步的目的是开始接收/发送数据了,所以服务端构建连接其实也是在为下一步做准备——为接收数据做准备,也就是创建SocketChannel并将其注册OP_READ事件。
二、构建连接过程一个新的连接建立会触发ServerSocketChannel上的OP_ACCEPT事件,也就是EventLoop中对OP_ACCEPT和OP_READ事件的处理:
// 处理OP_READ or OP_ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
如果对EventLoop事件循环处理不熟悉的同学可以参照Netty4.1源码分析—— 服务端启动一文。这里unsafe的实例指的是AbstractNioMessageChannel类里的内部类NioMessageUnfafe类,因为其read方法比较长,所以我们分步骤来分析。
2.1 创建SocketChannel在read方法中其doReadMessages方法是用来创建SocketChannel的,其源码如下:
protected int doReadMessages(List
从代码中看出,该方法接收一个buf数组,返回1代表创建成功,返回0代表失败。buf数组在这一步是用来存放已创建好的SocketChannel的,buf数组在后续广播read事件时会用到。而创建SocketChannel调用的是SocketUtils的accept方法:
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
可以看出,创建SocketChannel是ServerSocketChannel的accept方法,其内部调用的是NIO的NATIVE方法来构建SocketChannel实例。
2.2 DefaultMaxMessagesRecvByteBufAllocator类在read方法中,allocHandle实例指的是DefaultMaxMessagesRecvByteBufAllocator类的内部类MaxMessageHandle类。从名字不难看出,该Allocator类是为了处理读取(包含接受连接)事件时的一个处理类,在一次读取事件时,可能存在读取数据较少,接收缓冲区还能读取更多的情况。DefaultMaxMessagesRecvByteBufAllocator类就是为了处理这些情况而诞生的,其默认一次读取尽可能的读取满16次。首先看其几个重要方法:
2.2.1 incMessagesRead方法public final void incMessagesRead(int amt) {
totalMessages += amt;
}
该方法是为了记录总共的读取次数totalMessages变量。
2.2.2 continueReading方法private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;
}
};
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
continueReading方法是确认是否此次READ或者ACCEPT事件还能继续读取的判断。其判断条件有四个:
- 配置里自动读取打开。
- 尝试读取的字节和最后读取的字节数相等。
- 读取的总次数小于最大读取次数(16次)。
- 读取的总字节数大于0。
当上述条件全部满足时,才会进行下一次的读取。而在处理ACCEPT事件时,因为总读取的字节数始终为0,所以在此处只会进行doReadMessages一次。
2.2.3 lastBytesRead方法lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
该方法是记录上一次读取的字节数,该方法在读取数据时会用到,在构建连接时没有用到。
2.2.4 attemptedBytesRead方法public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
该方法是记录尝试读取的字节数目attemptedBytesRead变量,同样也只在读取数据时会用到。了解完上述知识,再看read方法前半段就轻而易举了:
private final List2.3 如何初始化SocketChannel?
在Netty4.1源码分析—— 服务端启动一文中,可以了解到ServerSocketChannel所绑定的handler类中有一个ServerBootstrapAcceptor类专门来初始化SocketChannel,那么怎么将SocketChannel传递到ServerBootstrapAcceptor类中呢?这就要分析一下read方法的后半段了:
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 在这一步将SocketChannel传播出去,让Acceptor接收到进行初始化
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
在此处,通过ServerSocketChannel所绑定的pipline通过广播channelRead事件,将SocketChannel广播出去,此时的pipline结构如图:
也就是在此处会将SocketChannel通过channelRead事件广播到ServerBootstrapAcceptor类中的channelRead方法里:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// next 一个EventLoop绑定
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
关于EventLoop和SocketChannel如何绑定的可以阅读Netty4.1源码分析—— 服务端启动一文中的3.1.4章节,其内容和ServerSocketChannel绑定过程一致。区别的是,虽然两者都是最终调用了io.netty.channel.AbstractChannel.AbstractUnsafe类的register0方法,也是先利用doRegister方法只注册上该channel感兴趣的事件集为0。区别在于ServerSocketChannel是利用doBin0方法来完成注册OP_ACCEPT事件的,而此时SocketChannel在该方法执行时就已经被激活了,所以直接走的pipline.channelActive事件来完成注册OP_READ事件:
private void register0(ChannelPromise promise) {
try {
//...
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
// 设置promise success, 注册ServerSocketChannel那么这里就是调用doBind0
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 注册ServerSocketChannel在这步没有激活,不会往下走
// 注册socketChannel时已被激活,所以继续执行
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}
//....
}
而在最后注册OP_READ事件时,也是和ServerSocketChannel一致,最终调用的是AbstractNioChannel的deBeginRead方法,只不过此处的interestOps值为1,即代表着OP_READ事件:
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// OP_ACCEPT = 1 << 4 = 16/OP_READ = 1 <<0 = 1
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
三、总结
- 利用Selector的select方法监听到OP_ACCEPT事件,调用AbstractNioMessageChannel类里的内部类NioMessageUnfafe类的read方法来处理该OP_ACCEPT事件。
- 采用ServerSocketChannel的accept方法创建一个SocketChannel。
- 通过ServerSocketChannel的pipline广播channelRead事件将SocketChannel传递至ServerBootstrapAcceptor类中channelRead方法中进行初始化和EventLoop绑定。
- 初始化的时候调用NIO的register方法将SocketChannel注册进Selector,注意此处设置SocketChannel的感兴趣事件集为0。
- 最终注册OP_READ事件集是通过pipline广播channelActive消息来完成的,OP_READ事件代表值为1。



