- 探究源码
- 启动流程
- nio启动流程
- 概述
- init
- Register
- doBind0()
- EventLoop
- Selector何时被创建
- 两个Selector成员变量
- EventLoop的nio线程何时被启动
- 提交普通任务会不会结束Selector阻塞
- wakeup()方法
- 何时进入Select分支进行阻塞
- 会阻塞多久
- nio空轮询bug
- EventLoop---ioRatio
- 执行io事件,在哪进行事件判断
- accept流程
- read流程
因为netty的底层使用的是nio,所以先回忆一下nio的启动流程对于接下来要探究的netty启动流程也是有好处的。
-
创建一个选择器,监听多个channel发生的各类事件
Selector selector = Selector.open();
-
创建一个ServerSocketChannel,并且设置非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false);
-
将serverSocketChannel注册进选择器中
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
这是jdk原生的ServerSocketChannel,将来如果selector发生了事件,会将这个事件交给Nio相应的类去处理,这里就使用到了attachment附件,通过附件将serverSocketChannel与NioServerSocketChannel进行绑定。
NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel(); SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
-
绑定端口
serverSocketChannel.bid(new InetSocketAddress(8080));
-
在selectionKey上注册一个它关心的事件类型
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
上面nio的五个步骤是如何在netty中实现的?
下方代码是使用netty创建一个服务器的基本步骤
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringDecoder());
}
})
.bind(8080);
我们知道EventLoop包含了一个Selector和一个单线程执行器 ,也就是说.group(new NioEventLoopGroup()) 这行语句可以看为是完成Nio的第一步创建一个选择器的。
Nio剩下的四个步骤其实都是在.bind(8080); 这行语句完成的,然后我们点进bind()方法,接着会进入到第一个比较重要的方法doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
// initAndRegister()方法 所做的事情就是初始化和注册,相当于上面Nio的第二步和第三步
// 它会将ServerSocketChannel创建好后注册进Selector中。该方法返回一个Future对象,就说明该方法是异步的,
final ChannelFuture regFuture = this.initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
// 这里就会利用future对象调用isDone()进行判断,如果上面initAndRegister()方法干的活比较快,就会执行if语句,
// 但是一般情况下initAndRegister()方法中的nio线程将ServerSocketChannel和Selector进行绑定会比较慢 会执行else语句
} else if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// doBind0()方法是相当于Nio的第四步 绑定端口,监听事件
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
// 从这里可以看出future对象采用了异步的方式执行下面的语句,下方的doBind0()方法也就不是主线程调用了
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
// 进入到else语句后会在这里执行doBind0()方法
AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
在正式开始之前需要了解ServerBootstrap.bind(8080);是主线程调用的,然后进入到doBind()方法 在进入到initAndRegister()方法中,直到创建ServerSocketChannel都是主线程做的事,包括register的前一部分都是主线程,但是在Register中会启动Nio线程,后续的操作就不是在主线程中执行了,ServerSocketChannel注册进Selector中都是Nio线程做的事,如下图所示:
概述需要了解的就几件事
- init是创建ServerSocketChannel
- Register是将ServerSocketChannel注册进Selector中的,是nio线程执行的
- initAndRegister()会返回一个future对象,然后使用该对象进行if判断,一般情况下都会进入到else语句
- else语句中会利用future的异步方式,通过nio线程来执行doBind0()方法
init
接下来详细了解initAndRegister()方法中的init部分。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 这行就是创建一个channel,创建的就是NioServerSocketChannel。
// 再点进newChannel()方法就会发现里面是利用了反射调用无参构造方法获取的对象 constructor.newInstance()
// 这里不仅仅会创建NioServerSocketChannel。还会创建jdk的ServerSocketChannel
channel = this.channelFactory.newChannel();
// 创建NioServerSocketChannel对象后就调用了init()方法,具体方法如下方代码所示
this.init(channel);
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
// 上面的init部分 从这里开始就是register部分了
ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
创建NioServerSocketChannel对象后就调用了init()方法
void init(Channel channel) throws Exception {
。。。
ChannelPipeline p = channel.pipeline();
。。。
// 这里就会发现创建NioServerSocketChannel后会往该channel的pipeline中添加一个Handler
// 这个ChannelHandler和其他hander不同的地方在于该handler的initChannel()方法只会执行一次
// 这里执行往pipeline中添加handler哦 还没有到执行的地步哦
p.addLast(new ChannelHandler[]{new ChannelInitializer() {
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
}
所以initAndRegister()方法中的init部分的作用就是
- 创建了一个NioServerSocketChannel,
- 并往该channel的pipeline中添加了一个Handler。
Register
接下来就轮到了Register部分
final ChannelFuture initAndRegister() {
Channel channel = null;
// init
try {
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
// 上面的init部分 从这里开始就是register部分了
ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
首先是register部分的第一行代码ChannelFuture regFuture = this.config().group().register(channel);该方法返回的是有个ChannelFuture对象,那么我们笃定该方法是异步的。然后我们点进该方法,多点几次就会进入到核心
经过的类如下图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8bC8ZvbT-1628604267435)(E:Java笔记pictureimage-20210806125339453.png)]
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
} else if (AbstractChannel.this.isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
} else if (!AbstractChannel.this.isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
} else {
AbstractChannel.this.eventLoop = eventLoop;
// 到目前为止都是主线程在执行,下面这个if就是判断当前线程是否是nio线程,所以自然而然就进入到else语句中
if (eventLoop.inEventLoop()) {
this.register0(promise);
} else {
// 这里做的事情就是将正在做事的register0(promise)方法封装到了任务对象中,然后让eventLoop线程去执行
try {
// 这里第一次调用execute()方法会创建EventLoop线程,然后取执行run()方法,而不是早就创建好线程直接用。
eventLoop.execute(new Runnable() {
public void run() {
AbstractUnsafe.this.register0(promise);
}
});
} catch (Throwable var4) {
。。。
}
}
}
}
现在进入到register0(promise)方法体中
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
return;
}
boolean firstRegistration = this.neverRegistered;
// 在netty源码中,一般do开头的方法就是真正做事的方法,具体功能如下方的代码
AbstractChannel.this.doRegister();
this.neverRegistered = false;
AbstractChannel.this.registered = true;
// doRegister()方法结束后还有下面这个方法也比较重要。我们上面在init的步骤中往channel的pipeline中添加了一个Handler,但是还没有被调用。下面这个方法就是调用init添加的Handler。
// 那个Handler的作用就是又往pipeline中添加另一个Handler,用来当acceptor事件发生后建立连接的
AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
// 在最开始的代码中主线程调用了initAndRegister()方法 返回了一个future对象,然后采用了异步的方式执行一些代码,异步的方法体需要等待Future对象中有数据了才会执行,那么谁来给数据嘞?就是下面这个safeSetSuccess()方法。
// 下面方法体中的promise就是initAndRegister()方法返回的future对象 方法意思就是给这个promise设置一个安全的成功值
// 这里给了结果 initAndRegister()方法返回的Future对象 异步方式的回调方法就会被执行 会执行方法体中的doBind0()方法。 回调方法体如下:
this.safeSetSuccess(promise);
AbstractChannel.this.pipeline.fireChannelRegistered();
if (AbstractChannel.this.isActive()) {
if (firstRegistration) {
AbstractChannel.this.pipeline.fireChannelActive();
} else if (AbstractChannel.this.config().isAutoRead()) {
this.beginRead();
}
}
} catch (Throwable var3) {
。。。
}
}
再进入到doRegister()方法中 这里会进入到AbstractNioChannel类的doRegister()方法
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
// this.javaChannel()就是拿到jdk原生的ServerSocketChannel
// 然后在用jdk原生的ServerSocketChannel调用register()方法,
// 该方法之前参数1需要绑定的Selector就是从我们现在的EventLoop中得到,刚开始没有关注事件,
// 最后的附件this就是NioServerSocketChannel
// 所以下面就和nio时学的serverSocketChannel.register(selector, 0, attachment)一样
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
// 这里的附件NioServerSocketChannel 之前讲过,如果jdk原生的ServerSocketChannel发生了事件,是由NioServerSocketChannel 调用一些对应的方法进行处理。
return;
} catch (CancelledKeyException var3) {
。。
}
}
}
initAndRegister()方法返回的Future对象 等待Future异步回调的方法
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
// 进入到else语句后会在这里执行doBind0()方法
AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
}
}
});
Register做的事情就是:
- 从主线程切换到nio线程
- 将ServerSocketChannel注册进Selector中,附件和NioServerSocketChannel进行绑定
- 再执行了init时添加的Handler,又往pipeline中又添加了一个Handler,用来处理将来发生的accept事件
- 并为promise赋值,也就是initAndRegister()方法返回的Future对象赋值,让future对象能够执行异步方法 在该方法中调用doBind0()方法
doBind0()
经过了initAndRegister()之后,就轮到执行Future异步方式的回调函数中 调用的doBind0()方法了
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
// netty的老套路,保存是EventLoop线程执行
channel.eventLoop().execute(new Runnable() {
public void run() {
if (regFuture.isSuccess()) {
// 继续往下面执行
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
一直往里面点
最后就进入到AbstractChannel类中的bind()方法
public final void bind(SocketAddress localAddress, ChannelPromise promise) {
this.assertEventLoop();
if (promise.setUncancellable() && this.ensureOpen(promise)) {
...
boolean wasActive = AbstractChannel.this.isActive();
try {
// 正真正做事的方法 do开头 它会执行ServerSocketChannel与端口号的绑定
AbstractChannel.this.doBind(localAddress);
} catch (Throwable var5) {
this.safeSetFailure(promise, var5);
this.closeIfClosed();
return;
}
// 在doBind()之后运行,
// 这里if就是判断就是 判断当前的ServerSocketChannel是否可以用了 是否是Active状态
if (!wasActive && AbstractChannel.this.isActive()) {
this.invokeLater(new Runnable() {
public void run() {
// 如果当前channel是Active状态了就会执行下面的语句
// 作用是触发channel中pipeline里面的所有Handler的active事件
AbstractChannel.this.pipeline.fireChannelActive();
}
});
}
this.safeSetSuccess(promise);
}
}
点进doBind()方法 选择 NioServerSocketChannel类中的doBind()方法:
protected void doBind(SocketAddress localAddress) throws Exception {
// 首先判断jdk版本是否大于等于7
if (PlatformDependent.javaVersion() >= 7) {
// this.javaChannel() 就是jdk的ServerSocketChannel,后面指定端口与设置全连接队列的大小
this.javaChannel().bind(localAddress, this.config.getBacklog());
} else {
this.javaChannel().socket().bind(localAddress, this.config.getBacklog());
}
}
doBind()方法执行完后会执行pipeline.fireChannelActive()方法,触发channel中pipeline里面的所有Handler的active事件。当前pipeline中的handler是:head–>acceptor–>tail。后两个Handler即使触发了active也不会做什么事情,主要做事的还是Head这一个Handler ,然后点进pipeline.fireChannelActive()方法
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 真正让SelectorKey关注accept事件是下面的方法执行的
this.readIfIsAutoRead();
}
再往下执行 最后会进入到AbstractNioChannel类的doBeginRead()方法
protected void doBeginRead() throws Exception {
// 这里就是SelectionKey
SelectionKey selectionKey = this.selectionKey;
if (selectionKey.isValid()) {
this.readPending = true;
int interestOps = selectionKey.interestOps();
// 这里首先判断SelectionKey是否已经关注Accept事件,如果没有才会执行下面的方法
if ((interestOps & this.readInterestOp) == 0) {
// 这里的 | 就是+号 this.readInterestOp的值就是16
// nio中 selectionKey.interestOps(SelectionKey.OP_ACCEPT); 这里的SelectionKey.OP_ACCEPT也是16
selectionKey.interestOps(interestOps | this.readInterestOp);
}
}
}
到目前为止,nio中的五个步骤在netty中都执行到了
EventLoop
NioEventloop的重要组成:selector、线程、任务队列。我们现在找到这三个在源码中的位置
// selector
public final class NioEventLoop extends SingleThreadEventLoop {
...
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
}
// 线程和任务队列在NioEventLoop的父类的父类中
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
private final Queue taskQueue; // 任务队列
private volatile Thread thread; // 线程
...
private final Executor executor;// 这个就是一个单线程的线程池 也就是上面的那个线程
// 因为NioEventloop只有一个线程,但我们可能会提交多个任务,单线程同一时刻就只能执行一个任务,多出来的任务就放在任务队列中
// 然后由线程从队列中依次取出任务来执行
}
再进入到曾祖父类AbstractScheduledEventExecutor
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
...
// 处理定时任务的任务队列
PriorityQueue> scheduledTaskQueue;
}
NioEventLoop即会处理io任务,也会处理普通任务,还有定时任务。
Selector何时被创建
在NioEventloop类的构造方法中
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
} else if (strategy == null) {
throw new NullPointerException("selectStrategy");
} else {
this.provider = selectorProvider;
// 这里会调用openSelector()方法
NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
this.selectStrategy = strategy;
}
}
进入到openSelector()
private NioEventLoop.SelectorTuple openSelector() {
final AbstractSelector unwrappedSelector;
try {
// 这里实际上就是创建一个Selector对象
unwrappedSelector = this.provider.openSelector();
} catch (IOException var7) {
throw new ChannelException("failed to open a new selector", var7);
}
。。。
}
以前我们在nio中使用的是Selector.open()方法创建是Selector,这里对比一下这两种方式有什么区别
public abstract class Selector implements Closeable {
protected Selector() { }
public static Selector open() throws IOException {
// 可以看到 nio调用的open()方法内部也是和netty调用的一样的方法
return SelectorProvider.provider().openSelector();
}
。。。
}
Selector何时被创建?
在NioEventLoop的构造方法中被创建。
两个Selector成员变量
为什么在NioEventLoop中会两个Selector成员变量
// selector
public final class NioEventLoop extends SingleThreadEventLoop {
...
// 在源码中也就是下面这两个Selector,各自有什么作用
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
}
在上面创建Selector时,调用nio底层方法provider.openSelector()创建的Selector其实是赋值给了unwrappedSelector。为什么netty还要再加一个Selector嘞?因为在nio原生的Selector会有一个SelectionKeys集合,将来发生了事件我们要从个这里面获取事件的信息。这个集合的实现默认使用的set集合,我们都知道set遍历的性能并不高,因为它的底层是一个hash表,遍历hash表会先去遍历每个hash桶,然后再去遍历每个链表。
因为遍历的性能并不高,所以netty做了这样一个优化,将nio内部SelectionKeys集合给替换掉了,换为了基于数组的实现。
// 具体的实现还是在NioEventloop类的openSelector()方法中
private NioEventLoop.SelectorTuple openSelector() {
final AbstractSelector unwrappedSelector;
try {
// 创建selector
unwrappedSelector = this.provider.openSelector();
} catch (IOException var7) {
throw new ChannelException("failed to open a new selector", var7);
}
。。。
// 内部基于数组的实现
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
。。。
try {
// 这里是利用反射 先拿到Selector的实现类,然后获得私有成员变量
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
。。。
// 使用netty提供的一个反射工具类,将这个私有成员变量 暴力反射 然后可以调用
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
} else {
// 暴力反射
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
} else {
// 反射调用这两个成员变量,然后将原始的Selector对象用netty提供的替换掉
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
}
}
。。。
}
为什么在NioEventLoop中会两个Selector成员变量?
为了在遍历SelectinKeys时提高效率
- private Selector selector; 这个是包装后的 内部SelectinKeys基于数组实现的Selector
- private Selector unwrappedSelector; 这个是原始的Selector
EventLoop的nio线程何时被启动
第一次调用execute()方法时会创建EventLoop线程
EventLoop eventLoop = new NioEventLoopGroup().next();
eventLoop.execute(()->{
System.out.println("第一次调用execute()方法时会创建EventLoop线程");
})
主线程调用execute()方法,底层执行流程如下
public void execute(Runnable task) {
// 首先判断方法的参数 Runnable对象是否为null
if (task == null) {
throw new NullPointerException("task");
} else {
// 然后在判断当前线程是否为nio线程,这里是主线程调用的execute()方法 所以这里会返回fasle
boolean inEventLoop = this.inEventLoop();
this.addTask(task); // 把这个任务加入到任务队列中
if (!inEventLoop) { // false在取反就为true 就会进入到if里面
this.startThread(); // 然后就会执行startThread()方法首次开启这个线程
。。。
}
下面为SingleThreadEventExecutor类的startThread()方法
private void startThread() {
// 这里第一次进来时 state的值为1 所以第一次这里的条件满足 if的第二个条件就是将state从1改为2
// 所以 当以后再次调用该方法 if的条件就不会成立了,只有第一次会成立
if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
boolean success = false;
try {
// 这里就是开启线程 , 如果开启成功就会将success变量变为true,所以findlly语句中也就不会执行了
// 如果开启线程抛异常了,finally语句就又会将state从2变为1
this.doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, 2, 1);
}
}
}
}
我们在点进doStartThread();方法
private void doStartThread() {
// EventLoop中的线程还为null
assert this.thread == null;
// 这里的executor就是单线程的线程池 这里使用这里面的nio线程执行一个任务
this.executor.execute(new Runnable() {
public void run() {
// 执行的任务就是把当前的nio线程赋值为 EventLoop中的thread成员变量。到这里一步,thread也就有值了
SingleThreadEventExecutor.this.thread = Thread.currentThread();
if (SingleThreadEventExecutor.this.interrupted) {
SingleThreadEventExecutor.this.thread.interrupt();
}
...
label1907: {
try {
var112 = true;
// 这里的run方法也比较重要,它做的事是 一个死循环, 不断的去找任务、定时任务、io事件去执行
SingleThreadEventExecutor.this.run();
...
}
}
}
}
EventLoop的nio线程何时被启动
首次调用execute方法时启动,重复调用该方法也不会启动多次线程,因为底层有一个if判断,第一次启动有一个状态为statu值为1,当线程启动成功后就会将值变为2,所以再次调用该方法if判断也不会成立。
提交普通任务会不会结束Selector阻塞
上面说过,首次调用execute方法启动nio线程,还会调用一个run()方法,启动一个死循环,代码如下
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// 当有事件发生的时候就会调用下方的select()方法
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
。。。
在点进select()方法
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
。。。
// 这里就是nio时我们见过的阻塞方法,避免线程一直在运行死循环,和我们使用时不一样的是这里不是调用的无参的select()方法
// netty调用的是有参的select()方法,当这里面的超时时间到了之后就会解除阻塞,之所以采用有参的select()方法
// 这是因为EventLoop不仅仅要处理io事件,还要处理一些其他任务,它不能一直阻塞。
// 这里当超时时间到了会解除阻塞,或者是有新任务提交也会唤醒它 以便及时的处理io事件之外的任务
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
当有普通任务提交了,nio线程这里的超时时间还没到这里还是阻塞的,那到底是如何唤醒这里的嘞?
这是因为当主线程调用execute()方法,然后再从里面调用startThread()方法,该方法会判断status是否值为1,也就是首次被调用,然后在调用doStartThread()方法,这个方法里面就会启动Executor单线程池里面的nio线程,主线程到这暂时结束,nio线程为thread赋值,然后调用我们这里的死循环run()方法,再进入switch分支调用一个方法 进而导致selector.select(timeoutMillis)阻塞。
当有普通任务了,通过eventLoop.execute(()->{..})添加普通任务,主线程又会执行一次execute()方法,又会调用startThread()方法,这时已经不是第一次调用该方法了所以不会做什么事情,然后startThread()方法执行完后接着执行execute()方法会调用一个wakeup()方法来唤醒nio阻塞的线程去执行普通任务。
提交普通任务会不会结束Selector阻塞
主线程会调用wakeup()方法唤醒阻塞的nio线程来执行普通任务。
wakeup()方法
通过上面引出了wakeup()方法,这里详细介绍该方法的执行条件
在NioEventLoop类中
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && this.wakenUp.compareAndSet(false, true)) {
this.selector.wakeup();
}
}
首先这里对if判断条件进行解读,前面一部分是表示只有EventLoop线程之外的线程提交任务才有机会执行wakeup()方法。如果的EventLoop线程自己提交的任务就会走其他的逻辑。wakenUP变量在定义时的类型是AtomicBoolean,它是采用cas的方式去设置值,多个线程在同一个时刻修改它的值只会有一个成功。它的作用是什么?
因为this.selector.wakeup();是一个比较耗费性能的操作,所以我们应该避免对它的频繁调用。将来会有这样一个情况,有多个线程都来提交任务,都走到了上面这个地方,那么selector需要唤醒几次嘞,其实1次就足够了。所以就用到了wakenUp这个原子变量。
何时进入Select分支进行阻塞
在上面我们知道主线程首次调用eventLoop.execute(Runnable run) —>调用startThread()方法 —>判断statu是否为1 是否为首次调用execute()方法 —>调用doStartThread()方法 nio线程接手主线程运行—>SingleThreadEventExecutor.this.run() —> 死循环
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 这里才会让EventLoop线程 selector.select(timeoutMillis) 进入阻塞
// 那么什么条件下才会进入到这条分支嘞?
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
。。。
}
决定进入select分支的条件就是switch中的代码selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())。
点进方法:
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 这里会根据上面传递过来的boolean变量决定,如果为false时就会走select阻塞分支
// 这个Boolean变量的作用就是当前是否有任务,如果没有任务就会进入select阻塞分支
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
如果有任务 selectSupplier.get()又会有什么作用。get()方法中又调用了selectNow()方法
int selectNow() throws IOException {
try {
// 之前nio讲select()方法时讲了三个使用,一个是空参的select,第二个就是带超时时间的select 第三个就是selectNow()
// 该方法与前两个不同的地方是它不会阻塞,它会在selector上立刻查看是否有事件发生,如果没有就返回0
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
所以,当没有任务时,才会进入Select分支,进行阻塞。如果有任务时 最终会调用selectNow()方法返回当前的任务数,并跳出switch多分支语句,执行switch下面的代码来处理任务。
会阻塞多久
当没有事件发生时,进入了select分支,最终执行selector.select(timeoutMillis); 那么这个超时时间为多久嘞,源码如下:
long currentTimeNanos = System.nanoTime(); // 当前时间
// 截止时间=当前时间+1秒 如果有定时任务 截止时间=当前时间+下一个定时任务开始的事件
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 超时时间 这里又减去了当前时间,后面又加上0.5毫秒,最后的是将纳秒转换为毫秒
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
。。。
}
所以当没有定时任务的情况下selector.select(timeoutMillis); 只会阻塞1秒左右。
那么什么时候会跳出select分支进入的select()方法的死循环
- 当前时间超过了截止时间,因为每一次循环都会重新为当前时间变量赋值
- 有任务发生了也会退出死循环
- 有事件发生了也会推出死循环
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime(); // 当前时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // 截止时间
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 当前时间超过了截止时间 会退出死循环
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// hasTasks()是否有任务
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 阻塞 当有事件发生会解除阻塞 selectedKeys也不为0
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 所以又事件发生这里也会退出死循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
。。。
long time = System.nanoTime();
。。。
currentTimeNanos = time;
}
// for end
。。
} catch (CancelledKeyException e) {
..
}
}
nio空轮询bug
nio中selector.select()方法有一个bug(jdk在linux环境下),本来正常情况下如果是无参的select()方法,只有在有事件发生时才会解除阻塞,如果是有超时时间的select()方法,没有事件发生需要等到超时时间才会解除阻塞。而这个bug有很小的几率会出现,那就是即使没有事件发生,超时时间也没到,也不会阻塞,特别是当好几个EventLoop线程都空轮询这就会很占用CPU资源。
netty解决了nio的空轮询bug,它解决的方法是采用一个循环计数的方式
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 就是这个selectCnt变量 初始值为0
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 阻塞 当有事件发生会解除阻塞 selectedKeys也不为0
int selectedKeys = selector.select(timeoutMillis);
// 每循环一次就会让计数++
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
。。。
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
// 这里首先判断有没有设置一个 期望值 大于0 并且如果 selectCnt大于了这个期望值就会退出死循环
// 这个期望值默认 会读取运行时的环境变量io.netty.selectorAutoRebuildThreshold
// 如果我们自己设置了值就以我们设置的为准,如果没有设置默认是512。
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 当发生了空轮询bug如果出现了就会调用下面的方法,作用是重新创建一个selector,替换掉旧的selector,
// 还会把旧的selector中的一些信息赋值个新的,内部的实现还是比较复杂的。
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
// for end
。。
} catch (CancelledKeyException e) {
..
}
}
EventLoop—ioRatio
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
// 当阻塞解除后,就会继续执行switch语句下面的代码来处理任务或者是发生的事件
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
// 如果ioRatio的值设置为100 就会先运行所有的io事件,然后在运行所有的普通任务
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {// else 会做两件事
// 首先获取当前时间
final long ioStartTime = System.nanoTime();
try {
// 1. 处理所有的io事件
processSelectedKeys();
} finally {
// 处理完io事件后在获取一次当前时间,在和之前的事件相减,得到处理io事件的时间
final long ioTime = System.nanoTime() - ioStartTime;
// 2. 运行普通任务,普通任务能执行的事件就是 ioTime * (100 - ioRatio) / ioRatio 来控制的
// 当普通任务运行的时间超过了这里传递的事件 它就不会从任务队列中拿普通任务执行了
// 会等下一次循环 处理完io事件后再来执行普通任务,这样就实现了优先处理io事件
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
。。
}
}
如果普通任务的耗时比较长,那势必会影响到io事件的执行,毕竟EventLoop中只是一个单线程,netty为了避免因为普通任务的耗时较长影响到io事件,netty会做一个参数的控制——ioRatio 这个参数是控制处理io事件所占用的事件比例,它默认是50%
执行io事件,在哪进行事件判断
就从上面的源码中 ioRatio 参数的判断中 执行所有的io事件的方法processSelectedKeys(); 跟进这个方法,就会进入到下面的方法中
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
// 这里首先是拿到所有的SelectionKey
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
// 然后获取附件,也就是NioServerSocketChannel ,这里为什么要拿到这个对象嘞,
// 因为接下来要对SelectionKey进行各种各样的处理,也就是Handler,所以需要通过channel得到pipeline 在得到Handler
final Object a = k.attachment();
// 拿到channel了就进行判断是否是NioChannel
if (a instanceof AbstractNioChannel) {
//然后就会进入到这个方法
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask task = (NioTask) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
processSelectedKey(k, (AbstractNioChannel) a);方法的源码,就是在这里面进行各类事件的判断,然后进行相应的处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// 可连接事件 客户端的事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 可读事件和连接事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
accept流程
首先会议nio中accept的流程
- selector.select()阻塞
- 遍历SelectionKeys
- 判断事件类型
- 创建SocketChannel
- 注册进Selector中
- 利用SelectionKey监听read事件
其实前面三步已经在上面学习EventLoop的源码中已经实现了,接下来重点关注后面的三步
在processSelectedKey(k, (AbstractNioChannel) a);方法中进行事件判断
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
...
try {
int readyOps = k.readyOps();
// 可连接事件 客户端的事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 可读事件和连接事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
当服务器启动,客户端启动就会进入到unsafe.read();方法中
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 这里就会创建SocketChannel 并将它设置为非阻塞的 , 这里的readBuf就是创建的NioSocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 这里就是把刚刚建立的连接当成一个消息 给pipeline中的Handler去处理。
// 目前的handler共有:head--> accept ---> tail,所以这里肯定是accept这个handler来进行处理
// 其实接下来的两步 将SocketChannel注册进select并且监听read事件都是这个handler做的事
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
。。
} finally {
。。。
}
}
创建SocketChannel 并将它设置为非阻塞的方法
protected int doReadMessages(List
接下来就不一步一步走了,最终它会到accept 这个Handler 中的方法中。最后会进入到ServerBootstrap类中的静态内部类ServerBootstrapAcceptor中的channelRead()方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这里的msg就是上面创建的NioSocketChannel
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
// 这里会对NioSocketChannel 设置一些参数
setChannelOptions(child, childOptions, logger);
for (Entry, Object> e: childAttrs) {
child.attr((AttributeKey
接着在进入register(child)方法,最终会进入到AbstractChannel类的register()方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
} else if (AbstractChannel.this.isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
} else if (!AbstractChannel.this.isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
} else {
AbstractChannel.this.eventLoop = eventLoop;
// 这里会把当前线程和EventLoop线程进行判断,现在的线程虽然是EventLoop线程,但还是会进入到else语句中
// 这里因为目前的线程是NioServerSocketChannel的EventLoop线程,但是线程新建了一个NioSocketChannel,这两个channel肯定不能共用一个线程,所以就会进入到else中新开一个线程来执行register0(promise)方法
if (eventLoop.inEventLoop()) {
this.register0(promise);
} else {
try {
// 这里会拿到新的EventLoop,用新的EventLoop中的线程
eventLoop.execute(new Runnable() {
public void run() {
// 程序会走到这里。
AbstractUnsafe.this.register0(promise);
}
});
} catch (Throwable var4) {
。。。
}
}
}
}
AbstractUnsafe.this.register0(promise);这行代码最后会调用doRegister()方法,
doRegister()方法就会先拿到jdk原生的SocketChannel,注册进当前EventLoop中的Selector中。并且把当前NioSocketChannel作为附件绑定上去
doRegister()方法结束后 , register0(promise)方法继续运行,调用pipeline.invokeHandlerAddedIfNeeded();方法,其实就是给现在的NioSocketChannel加一个Handler,这些handler就是我们自己在代码中写的 ,这里会把我们写的handler 加到channel中。
register0(promise)方法继续运行, 调用pipeline.fireChannelActive(); 这里的作用就是拿到SelectionKey,然后关注read事件,这最后会在AbstractNioChannel类的doBeginRead()方法中添加关注read事件
read流程
还是在这个地方处理读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
将断电打在这里,当第一次触发断点是连接,直接放行,再一次触发就是read事件了。
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 获取Bytebuf的分配器,决定Bytebuf是池化还非池化
final ByteBufAllocator allocator = config.getAllocator();
// 可以动态调整Bytebuf的大小,并强制使用直接内存
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 这里就是分配具体的Bytebuf
byteBuf = allocHandle.allocate(allocator);
// 客户端发送了数据,服务器端这里调用doReadBytes()方法后救护我那个Bytebuf中填充内容
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 找到当前NioServerSocketChannel上的pipeline,然后触发一个读事件,
// 就是将这个Bytebuf依次传给入站Handler 依次去处理。
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
最后祝大家学有所成,所愿皆所得。



