1、启动流程
Netty启动流程可以简化成如下代码
// netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector Selector selector = Selector.open(); // 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config NioServerSocketChannel attachment = new NioServerSocketChannel(); // 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 启动 nio boss 线程执行接下来的操作 //注册(仅关联 selector 和 NioServerSocketChannel),未关注事件 SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment); // head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor // 绑定端口 serverSocketChannel.bind(new InetSocketAddress(8080)); // 触发 channel active 事件,在 head 中关注 op_accept 事件 selectionKey.interestOps(SelectionKey.OP_ACCEPT);
- 获得选择器Selector,Netty中使用NioEventloopGroup中的NioEventloop封装了线程和选择器
- 创建NioServerSocketChannel,该Channel作为附件添加到ServerSocketChannel中
- 创建ServerSocketChannel,将其设置为非阻塞模式,并注册到Selector中,此时未关注事件,但是添加了附件NioServerSocketChannel
- 绑定端口
- 通过interestOps设置感兴趣的事件
bind
选择器Selector的创建是在NioEventloopGroup中完成的。NioServerSocketChannel与ServerSocketChannel的创建,ServerSocketChannel注册到Selector中以及绑定操作都是由bind方法完成的
所以服务器启动的入口便是io.netty.bootstrap.ServerBootstrap.bind
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
doBind
真正完成初始化、注册以及绑定的方法是io.netty.bootstrap.AbstractBootstrap.doBind
dobind方法在主线程中执行
private ChannelFuture doBind(final SocketAddress localAddress) {
// 负责NioServerSocketChannel和ServerSocketChannel的创建
// ServerSocketChannel的注册工作
// init由main线程完成,regisetr由NIO线程完成
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 因为register操作是异步的
// 所以要判断主线程执行到这里时,register操作是否已经执行完毕
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 执行doBind0绑定操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
// 如果register操作还没执行完,就会到这个分支中来
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 添加监听器,NIO线程异步进行doBind0操作
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
-
doBind()中有两个重要方法initAndRegister()和doBind0(regFuture, channel, localAddress, promise)
-
initAndRegister主要负责NioServerSocketChannel和ServerSocketChannel的创建(主线程中完成)与ServerSocketChannel注册(NIO线程中完成)工作
-
doBind0则负责连接的创建工作
initAndRegisterd
代码
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
init
Channel channel = null;
try {
// 通过反射初始化NioServerSocketChannel
channel = channelFactory.newChannel();
init(channel);
}
newChannel方法
@Override
public T newChannel() {
try {
// 通过反射调用NioServerSocketChannel的构造方法
// 创建NioServerSocketChannel对象
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
newSocket方法
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// ServerSocketChannel.open方法:
// SelectorProvider.provider().openServerSocketChannel()
// 所以此处相当于ServerSocketChannel.open()
// 创建了ServerSocketChannel实例
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
}
init方法
@Override
void init(Channel channel) {
...
// NioSocketChannl的Pipeline
ChannelPipeline p = channel.pipeline();
...
// 向Pipeline中添加了一个handler,该handler等待被调用
p.addLast(new ChannelInitializer() {
@Override
// register之后才调用该方法
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 创建handler并加入到pipeline中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加新的handler,在发生Accept事件后建立连接
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init主要完成了以下三个操作
- 创建NioServerSocketChannel
- 通过NioServerSocketChannel的构造器,创建了ServerSocketChannel
- 由initChannel方法向NioServerSocketChannel中添加了两个handler,添加操作在register之后被执行
- 一个handler负责设置配置
- 一个handler负责发生Accepet事件后建立连接
Register
init执行完毕后,便执行ChannelFuture regFuture = config().group().register(channel)操作
该方法最终调用的是promise.channel().unsafe().register(this, promise)方法
promise.channel().unsafe().register(this, promise)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 获取EventLoop
AbstractChannel.this.eventLoop = eventLoop;
// 此处完成了由 主线程 到 NIO线程 的切换
// eventLoop.inEventLoop()用于判断当前线程是否为NIO线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 向NIO线程中添加任务
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 该方法中会执行doRegister
// 执行真正的注册操作
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
register0方法
private void register0(ChannelPromise promise) {
try {
...
// 执行真正的注册操作
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 调用init中的initChannel方法
pipeline.invokeHandlerAddedIfNeeded();
...
} catch (Throwable t) {
...
}
}
doRegister方法
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// javaChannel()即为ServerSocketChannel
// eventLoop().unwrappedSelector()获取eventLoop中的Selector
// this为NIOServerSocketChannel,作为附件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
回调initChannel
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 添加新任务,任务负责添加handler
// 该handler负责发生Accepet事件后建立连接
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
Register主要完成了以下三个操作
-
完成了主线程到NIO的线程切换
- 通过eventLoop.inEventLoop()进行线程判断,判断当前线程是否为NIO线程
- 切换的方式为让eventLoop执行register的操作
- register的操作在NIO线程中完成
-
调用doRegister方法
// javaChannel()即为ServerSocketChannel // eventLoop().unwrappedSelector()获取eventLoop中的Selector // this为NIOServerSocketChannel,作为附件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- 将ServerSocketChannel注册到EventLoop的Selector中
- 此时还未关注事件
- 添加NioServerSocketChannel附件
-
通过invokeHandlerAddedIfNeeded调用init中的initChannel方法
- initChannel方法主要创建了两个handler
- 一个handler负责设置配置
- 一个handler负责发生Accept事件后建立连接
- initChannel方法主要创建了两个handler
doBind0
绑定端口
在doRegister和invokeHandlerAddedIfNeeded操作中的完成后,会调用safeSetSuccess(promise)方法,向Promise中设置执行成功的结果。此时doBind方法中由initAndRegister返回的ChannelFuture对象regFuture便会由NIO线程异步执行doBind0绑定操作
// initAndRegister为异步方法,会返回ChannelFuture对象
final ChannelFuture regFuture = initAndRegister();
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 如果没有异常,则执行绑定操作
doBind0(regFuture, channel, localAddress, promise);
}
}
});
doBind0最底层调用的是ServerSocketChannel的bind方法
NioServerSocketChannel.doBind方法
通过该方法,绑定了对应的端口
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用ServerSocketChannel的bind方法,绑定端口
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
关注事件
在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成,若完成,则会添加ServerSocketChannel感兴趣的事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
最终在AbstractNioChannel.doBeginRead方法中,会添加ServerSocketChannel添加Accept事件
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 如果ServerSocketChannel没有关注Accept事件
if ((interestOps & readInterestOp) == 0) {
// 则让其关注Accepet事件
// readInterestOp 取值是 16
// 在 NioServerSocketChannel 创建时初始化
selectionKey.interestOps(interestOps | readInterestOp);
}
}
注意:此处设置interestOps时使用的方法,避免覆盖关注的其他事件
-
首先获取Channel所有感兴趣的事件
final int interestOps = selectionKey.interestOps();
-
然后再设置其感兴趣的事件
selectionKey.interestOps(interestOps | readInterestOp);
各个事件对应的值
总结
通过上述步骤,完成了
- NioServerSocketChannel与ServerSocketChannel的创建
- ServerSocketChannel绑定到EventLoop的Selecot中,并添加NioServerSocketChannel附件
- 绑定了对应的端口
- 关注了Accept事件



