- 前言
- 原理解析
- 问题扩展
- Netty 体系庞大而复杂,所以会分成几篇文章进行讲解。深入理解需要有一定的基础和耐心,参照源码反复推敲才行
- 为了方便阅读,本篇的源码部分只保留核心部分,比较方便阅读,不重要的部分会删掉,看完整版需要自己打开IDEA哈
- 先上一张自己画的流程图
所有的Netty服务端程序通常都有这句 ServerBootstrap.bind(),它的内部会调用 initAndRegister() 方法,在 AbstractBootstrap.java 找到这段代码:
final ChannelFuture initAndRegister() {
//1.创建 NioServerSocketChannel
Channel channel = channelFactory.newChannel();
//2.ServerBootstrap 相关属性初始化
init(channel);
//3.把 NioServerSocketChannel 注册到 NioEventLoopGroup?
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
init(channel): 对 ServerBootstrap、NioServerSocketChannel、DefaultChannelPipeline进行初始化。同时,往 DefaultChannelPipeline 添加一个默认的 ChannelHandler
他们之间的组成关系如下图所示:
- Channel:对应当前例子的 NioServerSocketChannel
- ChannelPipeline:这里的实际类型是 DefaultChannelPipeline
- ChannelHandler:事件处理器(拦截器),在ChannelPipeline内部以双向链表的结构存储。ChannelPipeline默认有2个Handler,分别用 head(头部)、tail(尾部) 表示.
Pipeline 是怎么添加ChannelHandler,内部都做了哪些事情?
先看一下 init 添加的默认 Handler,这里添加的是 ChannelInitializer 类型。ChannelInitializer是抽象类,实现了ChannelPipeline接口,所以没问题。
ChannelInitializer 重写了 initChannel() 方法,initChannel() 会在ChannelHandler的 handlerAdded 事件触发时被调用。
ChannelInitializer 的职责就是初始化,虽然被添加到ChannelPipeline,但是在initChannel方法执行完毕后,Pipeline会将 ChannelInitializer 移除掉,因为它的初始化任务结束了。
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
再来看下 addLast() 的代码,下面有附上完整流程图说明:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
上面那段代码有个 executor.inEventLoop(),这里的 executor 实际是 NioEventLoop,inEventLoop()是判断当前执行线程是否 NioEventLoop 创建的线程,如果不是,会启动新的线程。
可以这么理解,NioEventLoopGroup 是多线程的,因为它内部包含了多个 NioEventLoop,而每个 NioEventLoop 是独立的单线程。
流程图的最后一个分支,为什么要判断Pipeline是否注册到EventLoop ?
原因很简单,避免取不到EventLoop对象而出错
config().group(): 返回的是 NioEventLoopGroup 类型,实际上是服务端的 BossGroup 对象。该方法会先找到 NioEventLoopGroup 内部的 NioEventLoop 对象,然后再调用 NioEventLoop 的注册方法。
//MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
继续跟踪 register() 方法,在 AbstractChannel.java 找到内部类 AbstractUnsafe,会执行它里面的 register0() 方法,这个代码比较多一点,下面会分步讲解:
private void register0(ChannelPromise promise) {
try {
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
doRegister(): 实际执行的是NIO代码,将 ServerSocketChannel 注册到 Selector。
- ServerSocketChannel:NioServerSocketChannel 内部有个 SelectableChannel 类型的属性ch,它的实际类型是nio包里面的 ServerSocketChannelImpl
- Selector:在 NioEventLoop 内部就有 Selector 类型的属性
这也证实了Netty就是基于NIO实现的,不了解NIO的可以先翻一下其他文章
pipeline.invokeHandlerAddedIfNeeded(): 执行PendingHandlerAddedTask
还记得前面画的流程图吗?ServerBootstrap初始化的时候会往Pipeline添加一个默认的Handler。因为当时Pipeline还没有注册到EventLoop,所以会在Pipeline内部创建待执行的 PendingHandlerAddedTask,真正执行的时机就是在这里。
safeSetSuccess(promise): 把Promise的状态设置为成功,并通知监听者
问题扩展pipeline.fireChannelRegistered(): 触发InboundChannelHandler的 channelRegistered 事件
问题一: ServerBootstrap初始化的时候,往 Pipeline 加了一个默认Handler,起到了什么效果?
p.addLast(new ChannelInitializer() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); //1 if (handler != null) { pipeline.addLast(handler); } //2 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
这里分成两部分:
-
1:获取声明ServerBootstrap时设置的handler,把它追加到pipeline。如下代码的红框部分,因为我们添加的是ChannelInitializer,所以会触发 initChannel 方法,然后删除该Handler。
-
2:往Pipeline添加一个类型为 ServerBootstrapAcceptor 的 Handler,它的作用主要是处理客户端连接,这部分在后面的文章会进行介绍。
问题二: AbstractNioChannel.doRegister,将 ServerSocketChannel 注册到 Selector 的时候,传递的 interestOps 值为 0,那怎么监听客户端注册连接事件?
看这张图,ServerSocketChannel注册到Selector的时候,传递监听事件的参数是0,表示什么事件都不监听。
在NioServerSocketChannel成功注册到NioEventLoop之后,会调用 safeSetSuccess 标记成功状态,随后通知监听者并执行回调事件 operationComplete
ServerBootstrap在进行初始化的时候,早就添加了监听器和回调事件,在事件内部执行 doBind0() 方法,这边是将 ServerSocketChannel 跟 IP+PORT 进行绑定。
绑定成功后会触发 pipeline 的 channelActive 事件,这里面经过层层跳转,最终又触发 NioServ erSocketChannel 的 doBeginRead 事件。就是在这里,会对 interestOps 进行设置。重点看下面代码的最后一个 if ,判断是否监听 readInterestOp 事件, 没有就追加。
这里又有新的疑问:readInterestOp 的值是哪里来的?只要搞定这个问题就基本OK了。
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
答案在NioServerSocketChannel的构造函数,这里默认设置了参数值为 SelectionKey.OP_ACCEPT
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}



