前面的代码在服务器端都是单线程配合一个 Selector 选择器来管理多个 channel 上的事件
这样有两个缺点
- 现在都是多核CPU,一个线程只能用一个核心,其他的核心会白白浪费
- 单线程是可以处理多个事件,但是如果某个事件耗时较长,就会造成其他事件的等待
如果让你设计一种较为合理的架构,你会怎样设计呢?
首先一点要把多核CPU 充分利用起来,第二就是每个线程对应自己的职责,例如,店小二负责接待,厨师负责炒菜,服务员负责记录菜单
代码实现
服务器
package com.zhao.c1;
import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
// 设置main 名称为 Boss
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 1. 创建固定数量的 work 并初始化
Work work = new Work("work0");
work.register();
while (true) {
// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞
boss.select();
Iterator iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}",sc.getRemoteAddress());
// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)
// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selector
log.debug("before register...{}",sc.getRemoteAddress());
sc.register(work.selector,SelectionKey.OP_READ,null);
log.debug("after register...{}",sc.getRemoteAddress());
}
}
}
}
static class Work implements Runnable {
private Thread thread; // 独立的线程
private Selector selector; // 独立的监听器
private String name; // work 的名字
private volatile boolean start; // 刚开始线程还未初始化
public Work(String name) {
this.name = name;
}
// 初始化 Thread 和 Selector
public void register() throws IOException {
if (!start) {
thread = new Thread(this, name); // 创建一个新线程,就是当前类
thread.start();
selector = Selector.open();
start = true;
}
}
@Override
public void run() {
while (true){
try {
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
// 读取到了 channel 中的数据
log.debug("read...{}",channel.getRemoteAddress());
channel.read(buffer);
// 切换至读模式
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端
package com.zhao.c1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost",8080));
sc.write(Charset.defaultCharset().encode("1234567890abcdef"));
System.in.read();
}
}
一切看似风平浪静,但是客户端已经向服务器发送了数据,服务器却没有打印,只显示了连接
原因是线程的先后顺序不一致导致的
注册事件
阻塞事件
如果先发生注册事件,再发生阻塞事件,客户端此时发送消息过来,run 方法中是不会再发生阻塞,会把这个事件处理完毕。
如果先发生阻塞事件,再发生注册事件,客户端此时发送消息过来,你的 read 事件还没注册到 Selector 上面,selector.select()会认为你现在没有事件要处理,会一直阻塞。
这个解决起来稍微有一点复杂,因为你想让注册事件发生在阻塞事件前面,阻塞事件是在另一个线程里边,所以得想办法把注册事件也放在这个线程里边,并且先于阻塞事件发生。
这可以通过一个队列来解决,先把我要做的事情存放到一个消息队列里边,执行另一个线程的 run 方法之前,先执行我消息队列的事件,这样一来就解决了 阻塞事件阻塞注册事件的问题。
package com.zhao.c1;
import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentlinkedDeque;
import java.util.concurrent.ConcurrentlinkedQueue;
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
// 设置main 名称为 Boss
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 1. 创建固定数量的 work 并初始化
Work work = new Work("work0");
while (true) {
// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞
boss.select();
Iterator iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)
// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selector
log.debug("before register...{}", sc.getRemoteAddress());
work.register(sc);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
static class Work implements Runnable {
private Thread thread; // 独立的线程
private Selector selector; // 独立的监听器
private String name; // work 的名字
private volatile boolean start; // 刚开始线程还未初始化
private ConcurrentlinkedQueue queue = new ConcurrentlinkedQueue<>(); // 两个线程通道
public Work(String name) {
this.name = name;
}
// 初始化 Thread 和 Selector
public void register(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this, name); // 创建一个新线程,就是当前类
thread.start();
selector = Selector.open();
start = true;
}
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup(); // 通知阻塞放行
}
@Override
public void run() {
while (true) {
try {
selector.select(); // work0,阻塞,使用 wakeup 唤醒
Runnable task = queue.poll();
if (task != null) {
task.run(); // 执行了 sc.register(selector, SelectionKey.OP_READ, null);
}
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
// 读取到了 channel 中的数据
log.debug("read...{}", channel.getRemoteAddress());
channel.read(buffer);
// 切换至读模式
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
设置多个 Worker,Worker 的个数根据 Cpu 的核数定的,可以用轮询做到
具体实现
创建一个 Workers 数组,容量为 CPU 的核数
创建一个原子 Integer 类
获得workers 数组中的 其中一个元素 = index.getAndIncrement() % works.length
package com.zhao.c1;
import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentlinkedDeque;
import java.util.concurrent.ConcurrentlinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
// 设置main 名称为 Boss
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
Work[] works = new Work[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < works.length; i++) {
works[i] = new Work("worker- "+i);
}
AtomicInteger index = new AtomicInteger();
// 1. 创建固定数量的 work 并初始化
while (true) {
// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞
boss.select();
Iterator iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)
// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selector
log.debug("before register...{}", sc.getRemoteAddress());
// round robin 轮询
works[index.getAndIncrement() % works.length].register(sc);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
static class Work implements Runnable {
private Thread thread; // 独立的线程
private Selector selector; // 独立的监听器
private String name; // work 的名字
private volatile boolean start; // 刚开始线程还未初始化
private ConcurrentlinkedQueue queue = new ConcurrentlinkedQueue<>(); // 两个线程通道
public Work(String name) {
this.name = name;
}
// 初始化 Thread 和 Selector
public void register(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this, name); // 创建一个新线程,就是当前类
thread.start();
selector = Selector.open();
start = true;
}
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup(); // 通知阻塞放行
}
@Override
public void run() {
while (true) {
try {
selector.select(); // work0,阻塞,使用 wakeup 唤醒
Runnable task = queue.poll();
if (task != null) {
task.run(); // 执行了 sc.register(selector, SelectionKey.OP_READ, null);
}
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
// 读取到了 channel 中的数据
log.debug("read...{}", channel.getRemoteAddress());
channel.read(buffer);
// 切换至读模式
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
NIO vs BIO
stream vs channel
-
stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
-
stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
-
二者均为全双工,即读写可以同时进行
IO 模型
-
stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
-
stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
-
二者均为全双工,即读写可以同时进行
IO 模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞
当一个应用程序调用了 read 方法引发的事件
阻塞IO一旦等待数据处发生阻塞,这个线程就什么也做不了了,只能老老实实的等待数据。不光用户态要等,内核态也要等。就像是一个单道环,没有回头路。
张三去买一包 程序员安慰剂,到了药店,药店老板说要等一阵子才有货,等一天或者等一年,张三都在等,一直等到货来了,买到了,才回去。
非阻塞IO一旦用户态发现内核态没有数据,就会立刻返回。下一次 where(true) 循环的时候,再去查看有无数据,一旦有数据了,就不会立刻返回。而是复制数据并返回。
张三去买一包 程序员安慰剂,到了药店,药店老板说要等一阵子才有货,张三一听转身就走,第二次,张三又去了,老板说有货,张三就买到货并返回。张三买货的期间是不能做其他事情的
多路复用用户态调用 select 方法,查看内核是否有事件,没有事件则阻塞,有事件内核会通知用户态有新事件发生。用户线程就会调用 read 方法进行读取,需要复制数据,就会发生阻塞
张三去买一包 程序员安慰剂,但是他不清楚药店有没有货,就打电话给药店老板,老板说现在没有,等有了我告诉你。过了一两天,有货了,老板打电话给张三,让他来拿货。于是张三过去买了一包又回来了。
与阻塞IO不同的是,多路复用每次获取的是一批事件。
例如:张三需要吃饭,吃完饭后还要写代码,写完代码后又要去吃饭。这之中发生了等待时间就只能在这里等待了。例如,去吃饭饭店没菜了,厨子去买菜,饭什么时候可以做好,谁也说不清楚。写代码,停电了,什么时候来电,谁也不知道。再次期间就只能干等。多路复用则不同,没菜了?我不去,等你有菜了,来电了我再做事情,例如,饭店打电话说现在没菜了,那张三就会呆在家,哪里也不去,公司打电话说断电了,张三也哪里都不会去。直到饭店说买到菜了,公司说来电了,张三才会起身去吃饭,去公司上班。他获取的是一批事件,不会陷入某一个事件中等待。
通过上面的案例,可以得出一个结论,就是当用户线程去处理请求,说明此时已经有一个或多个请求要处理了,等待的时间已经在 select 处等待过了,一次性的处理多个事件。
同步:线程自己去获取结果(一个线程) 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)阻塞IO :自己调用read 方法,自己接收请求。同步阻塞
阻塞IO :自己调用read 方法,自己接收请求。同步非阻塞
多路复用:自己调用read 方法,自己接收请求。同步
异步非阻塞:当前线程调用完 Thread1 后会向下继续运行,不会阻塞张三想买一包 程序员安慰剂,但自己又很忙,于是他叫来自己的小弟李四去买,张三去上班了,李四买完药后,回到家,拿给张三。
package com.zhao.c1;
import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;
@Slf4j
public class AioFileChannel {
public static void main(String[] args) throws IOException {
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {
// 参数1:ByteBuffer
// 参数2:读取的起始位置
// 参数3:附件
// 参数4:回调对象
ByteBuffer buffer = ByteBuffer.allocate(16);
log.debug("read begin");
channel.read(buffer, 0, buffer, new CompletionHandler() {
// read 成功调用此方法
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed");
attachment.flip();
ByteBufferUtil.debugAll(attachment);
}
// read 失败调用此方法
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
log.debug("read end");
} catch (IOException e) {
e.printStackTrace();
}
System.in.read();
}
}
主线程打印完 read begin 和 read end 后,Thread 1 才开始运行.
零拷贝
package com.zhao.c1;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
public class ClientDemo {
public static void main(String[] args) throws IOException {
//创建客户端的Socket对象(Socket)
//Socket(String host, int port) 创建流套接字并将其连接到指定主机上的指定端口号
Socket socket = new Socket("192.168.1.66",10000);
File f = new File("data.txt");
RandomAccessFile file = new RandomAccessFile(f,"r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
//获取输出流,写数据
socket.getOutputStream().write(buf);
//释放资源
socket.close();
}
}
package com.zhao.c1;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
public class ClientDemo {
public static void main(String[] args) throws IOException {
//创建客户端的Socket对象(Socket)
//Socket(String host, int port) 创建流套接字并将其连接到指定主机上的指定端口号
Socket socket = new Socket("192.168.1.66",10000);
File f = new File("data.txt");
RandomAccessFile file = new RandomAccessFile(f,"r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
//获取输出流,写数据
socket.getOutputStream().write(buf);
//释放资源
socket.close();
}
}
以上程序用户态和内核态之间的转换如下图
程序调用read 方法后,会有用户态转换成内核态。方法调用结束会有内核态转换为用户态,返回结果。会发生 1,2步骤的内容
程序调用 write 方法,会有用户态转换为内核态,会发生 3,4步骤的内容
程序运行,会发生3次 用户态与内核态的转换,4次 数据复制
二次优化在 Netty基础 NIO中有介绍过使用 ByteBuffer.allocateDirect(16),创建的 buffer 使用的是直接内存,会减少一次拷贝。
buffer使用的是操作系统中的内存,当磁盘想缓冲区写入时,byte 数组和内核缓冲区用的是同一块内存。减少了一次拷贝
三次优化无论是使用原始读取还是使用直接内存的 ByteBuffer,都绕不开java程序,linux 2.1 后提供的 sendFile 方法 ,java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
现在只需要 一次用户态到内核态的切换,是在 transferTo 方法发生后,3次 数据复制
四次优化linux 2.4
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
零拷贝,不需要在java 层面发生拷贝,拷贝只发生在网络与操作系统之间
优点和适用场景
-
更少的用户态与内核态的切换
-
不利用 cpu 计算,减少 cpu 缓存伪共享
-
零拷贝适合小文件传输



