nio多路复用 :
SelectableChannel
Selector
libs/nio :
NioSelectorGroup
NioSelector
ChannelFactory
ChannelContext
plugins/transport-nio :
NioGropuFactory
NioTranport
每个 nio selector 单线程处理ready 状态channel
void singleLoop() {
try {
closePendingChannels();
preSelect();
long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
int ready;
if (nanosUntilNextTask == 0) {
ready = selector.selectNow();
} else {
long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
// only select until the next task needs to be run. Do not select with a value of 0 because
// that blocks without a timeout.
ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));
}
if (ready > 0) {
Set selectionKeys = selector.selectedKeys();
Iterator keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = keyIterator.next();
keyIterator.remove();
if (sk.isValid()) {
try {
processKey(sk);
} catch (CancelledKeyException cke) {
eventHandler.genericChannelException((ChannelContext>) sk.attachment(), cke);
}
} else {
eventHandler.genericChannelException((ChannelContext>) sk.attachment(), new CancelledKeyException());
}
}
}
handleScheduledTasks(System.nanoTime());
} catch (ClosedSelectorException e) {
if (isOpen()) {
throw e;
}
} catch (IOException e) {
eventHandler.selectorException(e);
} catch (Exception e) {
eventHandler.uncaughtException(e);
}
}
处理监听,读写请求
void processKey(SelectionKey selectionKey) {
ChannelContext> context = (ChannelContext>) selectionKey.attachment();
if (selectionKey.isAcceptable()) {
assert context instanceof ServerChannelContext : "only server channels can receive accept events";
ServerChannelContext serverChannelContext = (ServerChannelContext) context;
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_ACCEPT) != 0) {
try {
eventHandler.acceptChannel(serverChannelContext);
} catch (IOException e) {
eventHandler.acceptException(serverChannelContext, e);
}
}
} else {
assert context instanceof SocketChannelContext : "only sockets channels can receive non-accept events";
SocketChannelContext channelContext = (SocketChannelContext) context;
int ops = selectionKey.readyOps();
if ((ops & SelectionKey.OP_CONNECT) != 0) {
attemptConnect(channelContext, true);
}
if (channelContext.isConnectComplete()) {
if (channelContext.selectorShouldClose() == false) {
if ((ops & SelectionKey.OP_WRITE) != 0) {
handleWrite(channelContext);
}
if (channelContext.selectorShouldClose() == false && (ops & SelectionKey.OP_READ) != 0) {
handleRead(channelContext);
}
}
}
eventHandler.postHandling(channelContext);
}
}



