Java NIO 是指 Java New IO 类库,基于多路复用IO模型实现,分为三大核心组件:Buffer、Channel、Selector,接下来以这三大组件进行测试学习。
Buffer从管道中读取数据时,数据流向为Channel到Buffer;往管道中写入数据时,数据流向为Buffer到Channel。可以理解为Buffer是一个临时缓冲区,用于保存临时数据,Channel用于保存真实数据。以 IntBuffer 为例子进行测试:
public static IntBuffer intBuffer;
public static void init() {
intBuffer = IntBuffer.allocate(10);
System.out.println("init done.");
}
public static void display() {
System.out.println("================= Java NIO Buffer Display =================");
System.out.println("limit " + intBuffer.limit());
System.out.println("position " + intBuffer.position());
System.out.println("capacity " + intBuffer.capacity());
}
public static void write() {
System.out.println("================= put 5 int num into intBuffer =================");
for (int i = 0; i < 5; i++) {
intBuffer.put(i);
}
System.out.println("write done.");
}
public static void read() {
System.out.println("================= read 5 int num from intBuffer =================");
for (int i = 0; i < 5; i++) {
System.out.println(intBuffer.get());
}
System.out.println("read done.");
}
public static void main(String[] args) {
init();
display();
write();
display();
intBuffer.flip();
display();
read();
display();
intBuffer.clear();
display();
}
输出如下:
init done. ================= Java NIO Buffer Display ================= limit 10 position 0 capacity 10 ================= put 5 int num into intBuffer ================= write done. ================= Java NIO Buffer Display ================= limit 10 position 5 capacity 10 ================= Java NIO Buffer Display ================= limit 5 position 0 capacity 10 ================= read 5 int num from intBuffer ================= 0 1 2 3 4 read done. ================= Java NIO Buffer Display ================= limit 5 position 5 capacity 10 ================= Java NIO Buffer Display ================= limit 10 position 0 capacity 10Channel
以 ByteBuffer 为例子,实现简单的文件复制:
public static void copyFile(String srcFileName, String destFileName) {
FileInputStream fileInputStream = null;
FileOutputStream fileOutputStream = null;
FileChannel inputStreamChannel = null;
FileChannel outputStreamChannel = null;
try {
// 获取文件输入输出流
fileInputStream = new FileInputStream(srcFileName);
fileOutputStream = new FileOutputStream(destFileName);
// 获取管道
inputStreamChannel = fileInputStream.getChannel();
outputStreamChannel = fileOutputStream.getChannel();
// 创建缓存区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 从管道中读取数据到缓存区
while (inputStreamChannel.read(byteBuffer) != -1) {
// 缓存区从写模式转成读模式
byteBuffer.flip();
while (outputStreamChannel.write(byteBuffer) != 0) {
}
byteBuffer.clear();
}
// 强制刷新
outputStreamChannel.force(true);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fileInputStream);
close(fileOutputStream);
close(inputStreamChannel);
close(outputStreamChannel);
}
}
更高效的方式应该为使用管道的 transferFrom 方法:
public static void copyFileFast(String srcFileName, String destFileName) {
FileInputStream fileInputStream = null;
FileOutputStream fileOutputStream = null;
FileChannel inputStreamChannel = null;
FileChannel outputStreamChannel = null;
long startTime = System.currentTimeMillis();
try {
// 获取文件输入输出流
fileInputStream = new FileInputStream(srcFileName);
fileOutputStream = new FileOutputStream(destFileName);
// 获取管道
inputStreamChannel = fileInputStream.getChannel();
outputStreamChannel = fileOutputStream.getChannel();
long size = inputStreamChannel.size();
System.out.println("size: " + size);
long position = 0;
long count;
while (position < size) {
// 每次最多复制 1024000000 byte = 1 GB
count = size - position > 1024000000 ? 1024000000 : size - position;
// 复制内存,偏移量position + count长度
position = position + outputStreamChannel.transferFrom(inputStreamChannel, position, count);
System.out.println("position: " + position);
}
// 强制刷新
outputStreamChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
} finally {
close(fileInputStream);
close(fileOutputStream);
close(inputStreamChannel);
close(outputStreamChannel);
}
long endTime = System.currentTimeMillis();
System.out.println("所需时间:" + (endTime - startTime));
}
输出如下:
size: 4781506560 position: 1024000000 position: 2048000000 position: 3072000000 position: 4096000000 position: 4781506560 所需时间:187682Selector
通过 Selector 可以同时监听多个 Channel ,以 SocketChannel 和 serverSocketChannel 为例子实现简单的客户端/服务器通讯:
public class SelectorTest {
public static void main(String[] args) {
try {
// 服务端就绪
new Server("Server1").listenAndRun();
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Client {
private String clientName;
public Client(String clientName) {
this.clientName = clientName;
}
public void sendData() throws IOException {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 12345);
SocketChannel socketChannel = SocketChannel.open(address);
socketChannel.configureBlocking(false);
while (!socketChannel.finishConnect()) {
}
// 发送数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(this.clientName.concat(": ").concat(LocalDateTime.now().toString()).getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
socketChannel.shutdownOutput();
socketChannel.close();
}
}
class Server {
private String serverName;
public Server(String serverName) {
this.serverName = serverName;
}
public void listenAndRun() throws IOException {
InetSocketAddress address = new InetSocketAddress(12345);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(address);
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
if (next.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (next.isReadable()) {
SocketChannel socketChannel = (SocketChannel) next.channel();
// 读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (socketChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
System.out.println(new String(byteBuffer.array()));
byteBuffer.clear();
}
socketChannel.close();
} else {
System.out.println("other ops");
}
}
iterator.remove();
}
selector.close();
}
}
public class Test {
public static void testSocket() {
new Thread(() -> {
try {
Client clientB = new Client("ClientB");
while (true) {
clientB.sendData();
Thread.sleep(2000);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
public static void main(String[] args) {
testSocket();
}
}
先开启服务端,再开启两个客户端,服务端输出如下:
ClientA: 2022-05-02T17:56:22.606 ClientA: 2022-05-02T17:56:24.608 ClientB: 2022-05-02T17:56:24.640 ClientA: 2022-05-02T17:56:26.616 ClientB: 2022-05-02T17:56:26.647 ClientA: 2022-05-02T17:56:28.628 ClientB: 2022-05-02T17:56:28.660 ClientA: 2022-05-02T17:56:30.643 ClientB: 2022-05-02T17:56:30.674 ClientA: 2022-05-02T17:56:32.650 ClientB: 2022-05-02T17:56:32.682
关闭服务端后,客户端提示连接错误:
java.net.ConnectException: Connection refused: connect at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:454) at sun.nio.ch.Net.connect(Net.java:446) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) at java.nio.channels.SocketChannel.open(SocketChannel.java:189) at Client.sendData(SelectorTest.java:41) at Test2.lambda$testSocket$0(Test2.java:14) at java.lang.Thread.run(Thread.java:745) Process finished with exit code 0



