- java NIO是jdk1.4引入的新的IO API、它可以代替标准的java IO API。
- NIO支持面相缓冲区、鲫鱼通道IO操作、以更高效的方式进行文件的读写操作。
通常在进行同步I/O操作时、如果读取数据、代码会阻塞直至有可供读取写入的线程、 传统的 server/Client 模式会基于TPR、服务器会为每个客户端请求建立一个线程池、由于该线程池单独负责一个客户请求、所以就出现了一个问题:线程数量的剧增、大量的线程会增大服务器的开销。而大多是情况为了避免这个问题、都采用线程池、并设置线程池的最大数量、但是这就又会带来一个问题:如果线程池中最大线程数是100、而这100个用户都在进行大文件下载、那么第101个用户就算是要请求一个几KB的页面也会阻塞、无法及时处理。
非阻塞NIO采用Reactor模式的工作方式、I/O调用不会被阻塞、相反是注册感兴趣的特定I/O事件、如可读数据到达、新的套接字连接等等、再发生特定事件时、系统再通知我们。NIO中实现非阻塞I/O的核心对象就是selector
- select是注册IO事件的地方、而当我们感兴趣的事件发生时、就是这个对象告诉我们所发生的事件
selector的本质是延迟IO操作到真正发生IO的时候、而不是以前的IO流只要打开就一直等待IO操作
| IO | NIO |
|---|---|
| 面向流 | 面向缓冲区 |
| 阻塞IO | 非阻塞IO |
| (无) | 选择器 |
核心部分:
- Channels
- Buffers
- Selectors
NIO 中还有很多类和组件、但是Channel、Buffer、Selector构成了核心的API、其他组件例如Pipe、FileLock、只不过是与这三个核心组件共同使用的工具类
Channel 和 IO 中的 Stream 是差不多一个等级的、只不过 Stream 是单向的、也就是只能进行读写中的一个操作
Channel是双向的、也就是即可以用来读也可以用来写、同时读写。因为Channel是全双工的、所以它可以比流更好的映射底层操作系统的API
而NIO中通过Channel封装了对数据源的操作、tongguoChannel我们可以操作数据源、但又不关心数据源的具体物理结构、这个数据源可能是多种的、比如是文件、也可以是网络Socket。再大多数应用中、Channel与文件描述符或者Socket是一一对应的。Channel用于在字节缓冲区和位于通道另一侧的实体之间有效的传输数据。
public interface Channel extends Closeable {
// 通道是否打开?
public boolean isOpen();
// 关闭通道
public void close() throws IOException;
}
Channel主要实现有:FileChannel、DataGramChannel、SocketChannel、ServerSocketChannel。分别对应IO、UDP、TCP(ServerClient)
4.2、Channel 实现- FileChannel 从文件中读写数据
- DataGramChannel 能通过 UDP 读写网络中的数据
- SocketChannel 能通过 TCP 读写网络中的数据
- ServerSocketChannel 可以进啊听新进来的 TCP 连接、像 Web 服务那样、对每一个新进来的连接都会创建一个SocketChannel
FieChannel类可以实现常用的 read、write、scatter、gather操作、同时它也提供了很多专用于文件的新方法、这些方法中的许多都是我们所属洗的文件操作
public class FileChannelDemo1 {
// 通过 FIleChannel 读取数据到 Buffer 中
public static void main(String[] args) throws Exception {
// 1、创建 FIleChannel
RandomAccessFile aFile = new RandomAccessFile("D:\Cloud\note\Channel.txt","rw");
FileChannel channel = aFile.getChannel();
// 2、创建 Buffer
ByteBuffer buf = ByteBuffer.allocate(1024);
// 3、读取数据到 Buffer 中
int bytesRead = channel.read(buf);
while (bytesRead != -1) {
System.out.println("读取了:" + bytesRead);
buf.flip();
while (buf.hasRemaining()) {
System.out.println((char) buf.get());
}
buf.clear();
bytesRead = channel.read(buf);
}
aFile.close();
System.out.println("读取结束~");
}
}
public class FileChannelDemo2 {
// FileChannel 写操作
public static void main(String[] args) throws Exception {
// 1、打开FileCHannel
RandomAccessFile accessFile = new RandomAccessFile("D:\Cloud\note\Channel.txt","rw");
FileChannel channel = accessFile.getChannel();
// 2、创建 Buffer对象
ByteBuffer buffer = ByteBuffer.allocate(1024);
String newData = "data at Cloud";
// 3、向缓冲区写入内容
buffer.put(newData.getBytes());
// 4、切换模式
buffer.flip();
// 5、FileChannel 从缓冲区写入到文件中
while (buffer.hasRemaining()){
channel.write(buffer);
}
// 6、关闭Channel
channel.close();
}
}
FIleChannel.write 是在 while 循环中调用,因为无法保证 write 方法一次能向FileChannel写入多少直接、因此需要重复调用 write 方法、直到 Buffer 中已经没有尚未写入通道的字节
4.2.2、FileChannel 方法介绍- position 获取FileChannel的当前位置、也可以通过传入 Long pos 参数 设置FileChannel 的当前位置
long pos = channel.position(); channel.position(pos + 123);
如果姜位置设置在文件结束符之后、然后试图从文件通道中读取数据、读方法将返回 -1 (文件结束标志)
如果姜位置设置在文件结束符之后、然后向通道中写数据、文件将撑大到当前位置并写入数据、这可能导致 “文件空洞” 问题、磁盘上物理文件中写入的数据间有空隙
2. size 返回该实例所关联的文件大小
3. truncate 截取一个文件、截取文件时、文件将指定长度的后面部分删除。
// 截取文件袋前1024个字节数据 channel.truncate(1024);
- force 将通道中尚未写到磁盘中的数据强制写到磁盘中、出于性能方面的考虑、操作系统会将数据存在内存中、所以无法保证写到FileChannel 里的数据一定会即时写到磁盘中。
force方法有一个boolean类型的参数、指明是否同时姜文件元数据(权限信息等)写到磁盘中 - transferTo、transferFrom
两个方法的传输方向是相反的
通道之间的数据传输:
public class FileChannelDemo3 {
public static void main(String[] args) throws Exception {
// TODO transferFrom
// 1、创建两个FileChannel
RandomAccessFile aFile = new RandomAccessFile("D:\Cloud\note\Channel.txt","rw");
FileChannel fromChannel = aFile.getChannel();// 1、打开FileCHannel
RandomAccessFile bFile = new RandomAccessFile("D:\Cloud\note\0822.txt","rw");
FileChannel toChannel = bFile.getChannel();
// fromChannel 传输到 toChannel
long position = 0;
long size = fromChannel.size();
toChannel.transferTo(fromChannel,position,size);
// 关闭资源
aFile.close();
bFile.close();
System.out.println("over!");
}
}
public class FileChannelDemo3 {
public static void main(String[] args) throws Exception {
// TODO transferTo
// 1、创建两个FileChannel
RandomAccessFile aFile = new RandomAccessFile("D:\Cloud\note\Channel.txt","rw");
// 2、打开FileCHannel
FileChannel fromChannel = aFile.getChannel();
RandomAccessFile bFile = new RandomAccessFile("D:\Cloud\note\0822.txt","rw");
FileChannel toChannel = bFile.getChannel();
// 3、toChannel 传输到 fromChannel
long position = 0;
long size = fromChannel.size();
toChannel.transferTo(position,size,fromChannel);
// 4、关闭资源
aFile.close();
bFile.close();
System.out.println("over!");
}
}
4.3、Socket 通道 介绍
1、 新的Socket通道类可以运行非阻塞模式并且是可选择的,可以激活大程序(网络服务器和中间件)巨大的可伸缩性和灵活性、避免了每个socket连接使用一个线程的必要、也避免了管理大量线程所需的上下文交换开销。借助新的NIO类、一个或几个线程就可以管理成百上千的活动socket连接、并且只有很少甚至可能没有性能损失、所有socket 通道(DatagramChannel、SocketChannel、ServerSocketChannel)都继承位于java.nio.channels.spi包中的AbstractSelectableChannel、这意味着我们可以用一个selector对象来执行socket通道的就绪选择(readiness selection)
2、请注意DatagramChannel、SocketChannel 实现定义读写功能的接口、而ServerScoketChannel不实现。ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象、它本身从不传输数据。
3、socket 和 socket通道 之间的关系
通道是一个连接 I/O 服务导管并提供与该服务交互的方法、就某个socket而言、它不会再次实现与之对应的socket通道类中的socket协议API,而 java.net 中已经存在的socket通道都可以被大多数协议操作重复使用
全部socket 通道类(DatagramChannel、SocketChannel、ServerSocketChannel)在被实例化时都会创建一个对等 socket对象、这些是我们所熟悉的java.net类(Socket、ServerSocket、DatagramSocket)、它们已经被更新以识别通道。对等 socket可以通过调用 socket 方法从一个通道上获取。这三个java.net 类现在都有getChannel方法
4、要把一个socket通道置于非阻塞模式、我们要依靠所有socket 通道类的公有超级类 : selectableChannel。就绪选择(readiness selection) 是一种可以用来查询通道的机制、该查询可以判断通道是否准备好执行一个目标操作、如果读或写。非阻塞 I/O 和可选择性是紧密相连的、那也正是管理阻塞模式的API代码要在selectableChannel 超级类中定义的原因。
设置或重新设置一个通道的阻塞模式是很简单的、只要调用configureBlocking方法即可、传递参数值为true则设为阻塞模式、参数值为false设为非阻塞模式、可以调用 isBlocking方法来判断某个 socket 通道当前处于哪个模式。
ServerSocketChannel是一个基于通道的socket监听器。它同我们所熟悉的 java.net.ServerSocketChannel 执行相同的任务、不过它增加了通道语义、因此能够在非阻塞模式下运行。
由于 ServerSocketChannel 没有 bind 方法、因此有必要取出对等的 socket 并使用它来绑定到一个端口以开始监听连接。我们也是使用对等 ServerSocket 的 API 来根据需要设置其他的 socket 选项。
ServerSocketChannel也有 accept方法。一旦创建一个 ServerSocketChannel 并用对等 socket 绑定了它、然后就可以再其中一个上调用 accept 如果你选择在 ServerSocket 上调用 accept方法、那么它会同任何其他的 ServerSocket表现一样的行为:总是阻塞并返回一个 java.net.Socket 对象。如果您选择在ServerSicketChannel 上调用 accpet方法则会返回 SocketChannel 类型的对象、返回的对象能够在非阻塞模式下运行。
大白话就是:
ServerSocketChannel 的 accpet 方法会返回 SocketChannel 类型对象、SocketChannel 可以在非阻塞模式下运行。
其他 Socket 的 accept 方法会返回 SocketChannel 类型对象、SocketChannel 以非阻塞模式被调用、当没有传入连接在等待时、ServerSocketChannel.accept() 会立即返回null、正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们使用一个选择器实例来注册 ServerSocketChannel 对象以实现新连接到达时自动通知功能。
public class ServerSocketChannelDemo {
public static void main(String[] args) throws Exception {
// 端口号
int port = 8888;
// buffer
ByteBuffer buffer = ByteBuffer.wrap("hello at Cloud".getBytes());
// ServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open();
// 绑定
ssc.socket().bind(new InetSocketAddress(port));
// 设置非阻塞模式
ssc.configureBlocking(false);
// 持续监听是否有新的链接连入
while (true) {
System.out.println("Waiting fot connections~");
SocketChannel sc = ssc.accept();
if (sc == null) { // 没有链接传入
System.out.println("null");
Thread.sleep(2000);
} else {
System.out.println("incoming connection from:" + sc.socket().getRemoteSocketAddress());
buffer.rewind(); // 指针指向0
sc.write(buffer); // 开始写操作
sc.close(); // 关闭资源
}
}
}
}
注意事项:
1、打开 ServerSocketChannel
- 通过调用 ServerSocketChannel 方法来打开 ServerSocketChannel
- ServerSocketChannel ssc = ServerSocketChannel.open();
2、关闭 ServerSocketChannel
- 通过调用 ServerSocketChannel.closes 方法关闭 ServerSocketChannel
- serverSocketChannel.close();
3、监听新的链接
- 通过 ServerSocketChannel 方法监听新进来的链接。当 accept 方法返回时、它返回一个包含新进来的连接的 SockeChannel。因此、accept 方法会一直阻塞到有新连接到达。通常不会仅仅监听一个连接、在 while 循环中调用 accept方法。
4、阻塞模式
- 会在 SocketChannel sc = ssc.accept();这里阻塞住进程
5、非阻塞模式 - 在非阻塞模式下、accept会立刻返回结果、如果没有新进来的链接、返回的将是 null
Java NIO 的 SocketChannel 是一个连接到 TCP 网络套接字的通道。
A selectable channel for stream-oriented connecting sockets.
SocketChannel是一种面向流连接sockets套接字的可选择通道。
- SocketChannel 是用来连接Socket套接字
- SocketChannel 主要用途用来处理网络 I/O 的通道
- SocketChannel 是基于TCP连接传输
- SocketChannel 实现了可选择通道、可以被多路复用的
1、对于已经存在的 socket 不能创建 SocketChannel
2、SocketChannel 中提供的 open 接口创建的 Channel 并没有进行网络级联、需要使用 connect 接口连接到指定地址
3、未进行连接的SocketChannel 执行 I/O 操作时、会抛出NotyetConnectedException
4、SocketChannel支持两种 I/O模式:阻塞和非阻塞
5、SOCketChannel支持异步关闭。如果 SocketChannel在一个线程上 read阻塞、另一个线程对该SocketChannel 调用 shutdownlnput、则读阻塞的线程将返回 -1 表示没有读取任何数据。如果 SocketChannel 在一个线程上write 阻塞、另一个线程对该 SocketChannel 调用 shutdownWrite、则写阻塞的线程将抛出 AsynchronousCloseException
6、SocketChannel 支持设定参数
- SO_SNDBUF 套接字发送缓冲区大小
- SO_RCVBUF 套接字接收缓冲区大小
- SO_KEEPALIVE 保活连接
- SO_REUSEADDR 复用地址
- SO_LINGER 有数据传输时延缓关闭 Channel (只有在非阻塞模式下有用)
- TCP_NODELAY 禁用Nagle算法
方法一
// 1、创建SocketChannel
// 1.1、hostname 主机IP port 端口号
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.baidu.com",80));
方法二
SocketChannel socketChannel1 = SocketChannel.open();
socketChannel1.connect(new InetSocketAddress("www.baidu.com",80));
4.4.3、连接校验
// 测试 SocketChannel 是否为 open 状态 socketChannel.isOpen(); // 测试 SocketChannel 是否已经被连接 socketChannel.isConnected(); // 测试 SocketChannel 是否正在运行 socketChannel.isConnectionPending(); // 校验正在进行套接字连接的 SocketChannel socketChannel.finishConnect();4.4.4、设置 阻塞/非阻塞
// true 阻塞 false 非阻塞 socketChannel.configureBlocking(false);4.4.5、读写模式
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.baidu.com",80));
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
socketChannel.read(byteBuffer);
socketChannel.close();
System.out.println("read over!");
以上为阻塞模式读、当执行到 read 处、线程将阻塞、控制台无法打印 read over!
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.baidu.com",80));
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
socketChannel.read(byteBuffer);
socketChannel.close();
System.out.println("read over!");
以上为非阻塞模式、控制台将打印 read over
读写都是面向缓冲区、这个读写方式与 FileChannel 相同
通过 setOptions 方法可以设置 socket 套接字的相关参数
socketChannel.setOption( StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE) .setOption(StandardSocketOptions.TCP_NODELAY,Boolean.TRUE);
可以通过 getOption 获取相关参数的值。如默认的接收缓冲区大小是 8192byte。SocketChannel 还支持多路复用。
// 获取参数 getOption socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE); socketChannel.getOption(StandardSocketOptions.SO_RCVBUF);4.5、DatagramChannel
正如SocketChannel 对应 Socket、ServerSocketChannel 对应 ServerSocket、每一个 DatagramChannel 对象也有一个关联的DatagramSocket 对象。正如 SocketChannel 模拟连接导向的流协议(TCP/IP)、DatagramChannel 则模拟包导向的无连接协议(UDP/IP)、DatagramChannel 是无连接的、每个数据包都是一个自包含的实体、拥有它自己的目的地地址及不依赖其他数据包的数据负载。与面向流的Socket不同、DatagramChannel 可以发送单独的数据包给不同的目的地址。同样、DatagramChannel 对象也可以接收来自任意地址的数据包、每个到达的数据包都含有它来自何处的信息(源地址)
4.5.1、打开 DatagramChannel打开10086 端口接受UDP数据包
DatagramChannel server = DatagramChannel.open(); server.socket().bind(new InetSocketAddress(10086));4.5.2、接收数据
通过 receive() 接收 UDP 包
ByteBuffer receiveBuffer = ByteBuffer.allocate(64); receiveBuffer.clear(); SocketAddress receiveAddr = server.receive(receiverBuffer);
SocketAddress 可以获得发包的ip、端口等信息、用 toString 查看
4.5.3、发送数据通过 send() 发送 UDP 包
DatagramChannel server = DatagrChannel.open();
ByteBuffer senBuffer = ByteBuffer.wrap("client send".getBytes());
server.send(sendBuffer,new InetSocketAddress("127.0.0.1",10086));
4.5.4、连接
UDP不存在真正意义上的连接、这里的连接是向特定服务地址用 read 和 write 接收发送数据包
client.connect(new InetSocketAddress("127.0.0.1",10086));
int readSize = client.read(sendBuffer);
server.write(sendBuffer);
read() 和 write() 只有在 connect() 后才能使用、不然会抛 NotYetConnectedException 异常。用 read() 接收时、如果没有接收到包、会抛 PortUnreachableException 异常
@Test
public void testConnect() throws Exception {
// 打开 DatagaramChannel
DatagramChannel connChannel = DatagramChannel.open();
// 绑定
connChannel.bind(new InetSocketAddress(9999));
// 连接
connChannel.connect(new InetSocketAddress("127.0.01",9999));
// write
connChannel.write(ByteBuffer.wrap("发送 at cloud".getBytes("UTF-8")));
// buffer
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
while (true) {
readBuffer.clear();
connChannel.read(readBuffer);
readBuffer.flip();
System.out.println(Charset.forName("UTF-8").decode(readBuffer));
}
}
4.5.5、DatagramChannel 示例
@Test
public void sendDatagram () throws Exception {
// 打开DatagramChannel
DatagramChannel sendChannel = DatagramChannel.open();
ByteBuffer buffer = ByteBuffer.wrap("发送 at Cloud".getBytes("UTF-8"));
InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1",9999);
// 发送
while (true) {
sendChannel.send(buffer,sendAddress);
System.out.println("发送完成!");
Thread.sleep(1000);
}
}
@Test
public void receiveDatagram () throws IOException {
// 打开 DatagaramChannel
DatagramChannel receiveChannel = DatagramChannel.open();
InetSocketAddress receiveAddress = new InetSocketAddress(9999);
// 绑定
receiveChannel.bind(receiveAddress);
// 创建Buffer
ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
while (true) {
receiveBuffer.clear();
SocketAddress socketAddress = receiveChannel.receive(receiveBuffer);
receiveBuffer.flip();
System.out.println("接收数据完成!" + socketAddress.toString() + "n" + Charset.forName("UTF-8").decode(receiveBuffer));
}
}
4.6、Scatter/Gather
scatter/gather 用于描述从Channel中读取或者写入到Channel的操作
- **分散(scatter):**从Channel 中读取是指在杜操作时将读取的数据写入到多个 Buffer 中。因此、Channel 将从 Channel 中读取的数据 “分散” 到多个 Buffer 中
- **聚集(garher):**写入 Channel 是指在写操作时姜多个 Buffer 中的数据 “聚集” 后发送到 Channel
scatter/gather 经常用于需要将传输的数据分开处理的场合、例如传输一个由消息头和消息体组成的消息、你可能会将消息体和消息头分散到不同的buffer中、这样你可以方便的处理消息头和消息体
4.6.1、Scattering ReadsByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = {header,body};
channel.read(bufferArray);
buffer 首先被插入到数组中、然后再将数组作为 channel.read() 的输入参数。read 方法按照 buffer 在数组中的顺序将从 channel 中读取的数据写入到 buffer、当一个 buffer 被写满后、channel 紧接着向另一个 buffer中写
Scattering Reads 在移动下一个 buffer 前、必须填满当前 buffer、这意味着它不适用于动态消息(消息大小不固定)。换句话说、如果存在消息头和消息体、消息头必须完成填充(128byte)、Scattering Reads 才能正常工作
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = {header,body};
channel.write(bufferArray);
buffer 数组是write 方法的入参、write 方法会按照 buffer 在数据中的顺序、将数据写入到 channel、注意只有 position 和 limit 之间的数据才会被写入。因此、如果一个 buffer 的容量为128byte、但是仅仅包含58byte到数据、那么这58byte的数据将会被写到 channel 中、因此与 Scattering Reads 相反、Gathering writes 能较好的处理动态消息
5、BuffersBuffer 用于和 NIO 通道进行交互。数据是从通道写入到缓冲区,从缓冲区写入到通道中
Buffer 通常的操作
1、将数据写入缓冲区
2、调用 buffer.flip() 反转读写模式
3、从缓冲区读取数据
4、调用 buffer.clear() 或 buffer.compact() 清除缓冲区内容
主要是作为缓冲区的作用
Biffers实现有:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer
在 NIO 中、所有的缓冲区类型都继承于抽象类 Buffer、最常用的就是 ByteBuffer、对于 Java 中的基本类型、基本都有一个具体 Buffer 类型与之相对应、他们之间的继承关系如下图所示
1、使用 Buffer 读写数据、一半遵循以下四个步骤:
- 写入数据到 Buffer
- 调用 flip 方法
- 从
选择器、事件监听
Selector 运行单线程处理多个Channel、如果你的应用打开了多个通道、但每个连接的流量都很低、使用selector就会很方便、例如一个聊天服务器中、要使用selector、得向selector注册Channel、然后调用它的select()方法、这个方法会一直阻塞到某个注册的通道有事件就绪、一旦方法返回、线程就可以处理这些事件、事件的例子有:新的连接进来、数据接收。



