栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

NIO 基础知识

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

NIO 基础知识

NIO 三大组件 Channel & Buffer

channel 它是读写数据的双向通道,可以从 channel 将数据读入buffer,也可以将buffer的数据写入 channel。

常见的Channel有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer 用来缓冲数据,常见的 buffer 有:

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer
Selector

selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上。适合连接数特别多,但流量低的场景。

调用selector的select()方法会阻塞,直到channel发生了读写就绪事件,这些事件发生,select方法就会返回这些事件交给thread来处理。

ByteBuffer ByteBuffer正确使用姿势
  1. 向 buffer 写入数据,例如调用 channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从 buffer 读取数据,例如调用 buffer.get()
  4. 调用 clear() 或 compact() 切换至写模式
  5. 重复 1 ~ 4 步骤

TestByteBuffer

import lombok.extern.slf4j.Slf4j;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

@Slf4j
public class TestByteBuffer {

    public static void main(String[] args) throws FileNotFoundException {
        // 获取 FileChannel 两种方式
        // 1. 输入流 2. RandomAccessFile
        try (FileChannel fileChannel = new FileInputStream("data.txt").getChannel()) {
            ByteBuffer buffer = ByteBuffer.allocate(10);
            while(true) {
                // 向buffer写数据
                int len = fileChannel.read(buffer);
                log.debug("读取到的字节 {}", len);
                if (len == -1) {
                    break;
                }
                buffer.flip(); // 模式切换 切换到读模式
                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    System.out.println((char) b);
                }
                buffer.clear(); // 切换到写模式
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
ByteBuffer内部结构

ByteBuffer的重要属性:

  • capacity(容量)
  • position(读写指针,读到哪里写到哪里的一个指针)
  • limit(读写限制)

初始状态

写模式下,写入4个字节后的状态:

切换到都模式下,即调用flip方法后的状态

读取4个字节后的状态

调用clear()方法后的状态

调用compact方法后,把未读完的数据向前压缩,切换到写模式下:

ByteBuffer常见的方法
  1. 分配空间,可以使用allocate方法为ByteBuffer分配空间,其它buffer类也有该方法
    Bytebuffer buf = ByteBuffer.allocate(16);
    
  2. 向buffer写入数据
    • 调用channel的read方法
      int readBytes = channel.read(buf);
      
    • 调用buffer字节的put方法
      buf.put((byte)127);
      
  3. 从buffer中读数据
    • 调用 channel 的 write 方法
      int rbyte = channel.write(buf);
      
    • 调用buffer的get方法
      byte b = buf.get();
      
  4. ByteBuffer 的 rewind()方法、mark()方法、reset()方法、get()方法、get(i)方法
    import java.nio.ByteBuffer;
    
    import static com.netty.c1.ByteBufferUtil.debugAll;
    
    public class TestByteBufferRead {
    
        public static void main(String[] args) {
    
            ByteBuffer buffer = ByteBuffer.allocate(10);
            buffer.put(new byte[]{'a', 'b', 'c', 'd'});
            buffer.flip();
    
            // 读数据 rewind() 重新回到开始位置,可以重复读数据
            buffer.get(new byte[4]);
            debugAll(buffer);
            buffer.rewind();
            System.out.println((char)buffer.get());
            debugAll(buffer);
            buffer.rewind();
    
            // mark && reset
            // mark 做标记,记录position的位置,reset是将position重置到mark的位置
            System.out.println((char)buffer.get());
            System.out.println((char)buffer.get());
            buffer.mark();
            System.out.println((char)buffer.get());
            System.out.println((char)buffer.get());
            buffer.reset();
            System.out.println((char)buffer.get());
            System.out.println((char)buffer.get());
    
            // get(i)方法,不会改变 position的值
            System.out.println((char)buffer.get(3));
            debugAll(buffer);
        }
    }
    
ByteBuffer 与字符串相互转换
  • 字符串到ByteBuffer

    1. 使用ByteBuffer的put方法
      ByteBuffer buffer = ByteBuffer.allocate(16);
      byte[] bytes = "hello".getBytes();
      buffer.put(bytes);
      
    2. 使用 StandardCharsets 类
      ByteBuffer helloBuffer = StandardCharsets.UTF_8.encode("hello");
      
    3. 使用ByteBuffer的wrap方法
      ByteBuffer byteBuffer = ByteBuffer.wrap("hello".getBytes());
      

    后两种方式,将字符串转换为ByteBuffer后,ByteBuffer都处于读模式下,可以直接读。而第一种方式ByteBuffer还是处于写模式下,需要调用flip()方法后,才能读。

  • ByteBuffer到字符串

    String s = StandardCharsets.UTF_8.decode(helloBuffer).toString();
    
分散读 Scattering Reads

如果我们事先知道如何将大文件拆分多个小文件,那么我们就可以使用多个ByteBuffer,分别夺取大文件中的部分内容,这种就叫分散读。

如文件 data.txt 内容为

onetwothree

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import static com.netty.c1.ByteBufferUtil.debugAll;

public class TestScatteringReads {

    public static void main(String[] args) {

        try (RandomAccessFile file = new RandomAccessFile("data.txt", "rw")) {
            FileChannel channel = file.getChannel();
            ByteBuffer buff1 = ByteBuffer.allocate(3);
            ByteBuffer buff2 = ByteBuffer.allocate(3);
            ByteBuffer buff3 = ByteBuffer.allocate(5);
            channel.read(new ByteBuffer[]{buff1, buff2, buff3});
            buff1.flip();
            buff2.flip();
            buff3.flip();
            debugAll(buff1);
            debugAll(buff2);
            debugAll(buff3);
        } catch (IOException e) {
        }
    }
}
集中写 Gathering Writes

我们可以将多个ByteBuffer中的内容,集中写入到一个文件中。

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;

public class TestGatheringWrite {

    public static void main(String[] args) {

        ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("Hello");
        ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("World");
        ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("Sky");

        try (RandomAccessFile file = new RandomAccessFile("data2.txt", "rw")) {
            FileChannel channel = file.getChannel();
            channel.write(new ByteBuffer[]{buffer1, buffer2, buffer3});
        } catch (IOException e) {
        }

    }
}
粘包和半包

粘包产生原因:逐条消息发送效率低,若干条消息一起发送。
半包产生的原因:由于服务器缓冲区大小限制导致没有发送完整的包。

import java.nio.ByteBuffer;

import static com.netty.c1.ByteBufferUtil.debugAll;


public class TestByteBufferExam {

    public static void main(String[] args) {

        ByteBuffer source = ByteBuffer.allocate(32);
        // 模拟网络中消息的发送
        source.put("Hello,worldnI'm zhangsannHo".getBytes());
        split1(source);
        source.put("w are you?n".getBytes());
        split1(source);
    }

    private static void split(ByteBuffer source) {
        
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            if (source.get(i) == 'n') {
                int length = i - source.position() + 1;

                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; ++j) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
        source.compact();
    }

    private static void split1(ByteBuffer source) {

        source.flip();
        int oldLimit = source.limit();
        for (int i = 0; i < oldLimit; i++) {

            if (source.get(i) == 'n') {
                source.limit(i + 1);
                ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
                target.put(source);
                source.limit(oldLimit);
                debugAll(target);
            }
        }
        source.compact();
    }
}
文件编程 FileChannel

FileChannel只能工作在阻塞模式下,不能与selector一起使用。

  1. 获取FileChannel
    FileChannel不能直接打开,必须通过FileInputStream、FileOutputStream、RandomAccessFile来获取FileChannel,它们都有getChannel方法。

    • 通过FileInputStream获取的channel只能读
    • 通过FileOutputStream获取的channel只能写
    • 通过RandomAccessFile获取的channel,能否读写,取决于构造RandomAccessFile是的读写模式决定
  2. 读取
    从 channel 读取数据填充 ByteBuffer,返回值表示读取到多少字节,-1表示到达了文件末尾。

    int readBytes = channel.read(buffer);
    
  3. 写入
    将buffer中的数据写入channel的固定模式

    ByteBuffer buffer = ...;
    buffer.put(...);
    buffer.flip();
    
    while (buffer.hasRemaining()) {
        channel.write(buffer);
    }
    
  4. channel必须要关闭。但是调用FileInputStream、FileOutputStream、RandomAccessFile的close方法会间接调用channel的close方法。

  5. 强制写入
    操作系统出于性能的考虑,会将数据缓存,不会立即写入磁盘。只有当 channel 真正关闭时,才会同步到磁盘中。可以调用其force(true)方法将数据强制写入磁盘。

transferTo 传输数据
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;


public class TestFileChannelTransferTo {

   public static void main(String[] args) {

       try (FileChannel from = new FileInputStream("data.txt").getChannel();
            FileChannel to = new FileOutputStream("to.txt").getChannel()
           ) {
           from.transferTo(0, from.size(), to);
       } catch (IOException e) {
           e.printStackTrace();
       }
   }
}

文件大小超过2G后,需要多次传输:

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;


public class TestFileChannelTransferTo {

    public static void main(String[] args) {

        try (FileChannel from = new FileInputStream("data.txt").getChannel();
             FileChannel to = new FileOutputStream("to.txt").getChannel()
            ) {

            long size = from.size();
            for (long left = size; left > 0; ) {
                left -= from.transferTo((size -left), left, to);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Path

jdk7 引入了 Path 和 Paths 类

  • Path 用来表示文件路径
  • Paths 是工具类,用来获取 Path 实例
import java.nio.file.Path;
import java.nio.file.Paths;

public class TestPath {

    public static void main(String[] args) {

        Path path = Paths.get("~");
        System.out.println(path.normalize());

        Path path1 = Paths.get("~", "file");
        System.out.println(path1.normalize());

        Path path2 = Paths.get("~","data/project/a/../b");
        System.out.println(path2);
        System.out.println(path2.normalize());
    }
}
Files
  1. 检查文件是否存在、创建一级目录、创建多级目录、拷贝文件、移动文件、删除文件

    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import java.nio.file.StandardCopyOption;
    
    public class TestFiles {
    
        public static void main(String[] args) throws IOException {
    
            Path path = Paths.get("hello/data.txt");
            System.out.println(Files.exists(path));  // 检查文件是否存在
    
            Path path1 = Paths.get("hello");
            Files.createDirectory(path1);            // 创建一级目录
    
            Path path2 = Paths.get("hello/d1/d2");
            Files.createDirectories(path2);          // 创建多级目录
    
            Path source = Paths.get("data.txt");
            Path target = Paths.get("target.txt");
            
            
    
            Files.copy(source, target); // 如果目标文件存在,则会抛出FileAlreadyExistsException异常
    
            Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING); // 覆盖写,即使原文件存在,也会抛出异常
            
            Files.move(source, target, StandardCopyOption.ATOMIC_MOVE); // 文件重命名
            
            Files.delete(target);
        }
    }
    
  2. 遍历文件目录

    import java.io.IOException;
    import java.nio.file.*;
    import java.nio.file.attribute.BasicFileAttributes;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TestWalkFileTree {
    
        public static void main(String[] args) throws IOException {
            Path path = Paths.get("/opt/java/jdk1.8.0_261");
            AtomicInteger dirCount = new AtomicInteger();
            AtomicInteger fileCount = new AtomicInteger();
            Files.walkFileTree(path, new SimpleFileVisitor(){
    
                @Override
                public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                    System.out.println(dir);
                    dirCount.incrementAndGet();
                    return super.preVisitDirectory(dir, attrs);
                }
    
                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    System.out.println(file);
                    fileCount.incrementAndGet();
                    return super.visitFile(file, attrs);
                }
            });
    
            System.out.println(dirCount);
            System.out.println(fileCount);
        }
    }
    

    使用的是访问者模式。重写时,不要修改 return 语句内容。

  3. 统计目录下特定文件的数量,如Jar文件

    import java.io.IOException;
    import java.nio.file.*;
    import java.nio.file.attribute.BasicFileAttributes;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TestCountJars {
    
        public static void main(String[] args) throws IOException {
            Path path = Paths.get("/opt/java/jdk1.8.0_261");
            AtomicInteger fileCount = new AtomicInteger();
            Files.walkFileTree(path, new SimpleFileVisitor() {
    
                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    if (file.toFile().getName().endsWith(".jar")) {
                        fileCount.incrementAndGet();
                    }
                    return super.visitFile(file, attrs);
                }
            });
            System.out.println(fileCount);
        }
    }
    
  4. 删除目录(慎用,删除后不可恢复)

    import java.io.IOException;
    import java.nio.file.*;
    import java.nio.file.attribute.BasicFileAttributes;
    
    public class TestDeleteFileAndDir {
    
        public static void main(String[] args) throws IOException {
    
            Path path = Paths.get("xxxxx");
            Files.walkFileTree(path, new SimpleFileVisitor(){
    
                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    Files.delete(file);
                    return super.visitFile(file, attrs);
                }
    
                @Override
                public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
                    Files.delete(dir);
                    return super.postVisitDirectory(dir, exc);
                }
            });
        }
    }
    
  5. 拷贝多级目录

    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    
    public class TestCopyDir {
    
        public static void main(String[] args) throws IOException {
            
            String source = "源目录";
            String target = "目标目录";
    
            Files.walk(Paths.get(source)).forEach(path -> {
                try {
                    String targetName = path.toString().replace(source, target);
                    if (Files.isDirectory(path)) { // 目录
                        Files.createDirectory(Paths.get(targetName));
                    } else if (Files.isRegularFile(path)) {  // 普通文件
                        Files.copy(path, Paths.get(targetName));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
网络编程 阻塞 & 非阻塞
  1. 阻塞
    阻塞模式下,相关方法都会导致线程暂停,ServerSocketChannel.accept会在没有连接建立时让线程暂停,SocketChannel.read会在没有数据可读时让线程暂停,阻塞的表现其实就是线程暂停,暂停期间不会占用CPU,但线程相当于闲置。

    单线程阻塞模式下,服务端代码:

    import lombok.extern.slf4j.Slf4j;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.ArrayList;
    import java.util.List;
    
    import static com.netty.c1.ByteBufferUtil.debugRead;
    
    @Slf4j
    public class BlockServer {
    
        public static void main(String[] args) throws IOException {
            ByteBuffer buffer = ByteBuffer.allocate(16);
    
            ServerSocketChannel ssc = ServerSocketChannel.open();  
    
            ssc.bind(new InetSocketAddress(8080));
    
            List channels = new ArrayList<>();
    
            while (true) {
                log.debug("connecting...");
                SocketChannel sc = ssc.accept();  // 阻塞,等待客户端的连接
                log.debug("connected... {}", sc);
                channels.add(sc);
                for (SocketChannel channel : channels) {
                    log.debug("before read... {}", channel);
                    channel.read(buffer); // 阻塞,等待客户端发送数据
                    buffer.flip();
                    debugRead(buffer);
                    buffer.clear();
                    log.debug("after read...{}", channel);
                }
            }
        }
    }
    

    阻塞模式下,一个线程多个连接。阻塞方法的调用,会相互影响。当accept()方法被调用时候,read()方法就不能被调用。

  2. 非阻塞模式
    非阻塞模式下,相关的方法都不会让线程暂停。ServerSocketChannel.accept在没有连接建立时,会返回null,继续运行;SocketChannel.read在没有数据可读时,会返回0,但线程不阻塞,可以去执行其它的SocketChannel的read或是去执行ServerSocketChannel.accept;

    import lombok.extern.slf4j.Slf4j;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.ArrayList;
    import java.util.List;
    
    import static com.netty.c1.ByteBufferUtil.debugRead;
    
    @Slf4j
    public class NonBlockServer {
    
        public static void main(String[] args) throws IOException {
            ByteBuffer buffer = ByteBuffer.allocate(16);
    
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);  // 设置为非阻塞模式
            ssc.bind(new InetSocketAddress(8080));
    
            List channels = new ArrayList<>();
    
            while (true) {
                SocketChannel sc = ssc.accept();  // 非阻塞
                if (sc != null) {
                    sc.configureBlocking(false);  // 设置为非阻塞模式
                    channels.add(sc);
                }
                for (SocketChannel channel : channels) {
                    int read = channel.read(buffer);// 非阻塞
                    if (read > 0) {
                        buffer.flip();
                        debugRead(buffer);
                        buffer.clear();
                        log.debug("after read...{}", channel);
                    }
                }
            }
        }
    }
    

    单线程,非阻塞模式下。线程在一直循环,很多时候做无用功。导致CPU占用过高,白白浪费CPU资源。

  3. 多路复用
    单线程可以配合 Selector 完成对多个Channel可读事件的监控。这称之为多路复用。

    • 多路复用仅针对网络IO、普通文件IO没法利用多路复用
    • 如果不用Selector的非阻塞模式,线程大部分时间在做无用功,而Selector能够保证:有连接事件时才去连接,有可读事件时才去读取,有可写事件时采取写入。
Selector

使用Selector的好处:

  1. 一个线程配合Selector可以监控多个channel事件,事件发生线程才去处理。避免了非阻塞模式下所做的无用功。没有事件发生则阻塞,不占用CPU资源。
  2. 让这个线程能够被充分利用。
  3. 节约了线程资源。
  4. 减少了线程的上下文切换。
事件类型
  1. accept事件,会在有连接请求时触发
  2. connect事件,是客户端连接建立时触发
  3. read事件,可读事件
  4. write事件,可写事件
@Slf4j
public class SelectorServer {

    public static void main(String[] args) throws IOException {

        // 1. 创建selector,管理多个channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // 2. 建立selector与serversocketchannel之间的联系
        SelectionKey sscKey = ssc.register(selector, 0, null);
        // 只关注accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key {}", sscKey);

        ssc.bind(new InetSocketAddress(8080));

        while (true) {
            selector.select();  // 没有事件发生时,线程阻塞,但有自己感兴趣的事件发生,线程被唤醒
            // selectedKeys 内部包含了所有要发生的事件
            //该迭代的集合中的key是在感兴趣事件发生时加入的,是一个新的集合
            Iterator iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {

                SelectionKey key = iterator.next();
                log.debug("key: {}", key);
                // 从selectedKeys中删除感兴趣的事件 注意这里的key是在事件发生后加入的,
                // 即调用select()方法后,感兴趣的事件发生后,select()方法解除阻塞后,加入到selectedKeys中的
                // 所以当事件处理完成后,这里的key要移除
                // selector上key是在 selector 与 channel发生联系时,加入到selector中
                // 主要 key 会在两个集合中出现。
                iterator.remove();

                if (key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                    log.debug("{}", scKey);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        int read = socketChannel.read(buffer);
                        if (read == -1) { // 客户端正常断开
                            key.cancel();
                        } else {
                            buffer.flip();
                            debugRead(buffer);
                            buffer.clear();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();  // 客户端异常断开后,因此需要将key取消(从selector的key集合中删除)
                    }
                }
            }
        }
    }
}
  1. 为什么要从 selectedKeys() 返回的集合中删除key,不删除会怎么样?
  2. 当处理读事件时,客户端正常断开和异常断开如何处理?
消息边界处理 读事件


消息边界处理:

  1. 当ByteBuffer的容量比较小,一次性不能放下一条完整消息时:此时需要对ByteBuffer扩容。
  2. 当ByteBuffer的容量大于最长的一条消息时,此时可能放不下第二条消息,可能产生半包问题:此时需要定义消息的边界,比如用n来分割一条消息。
  3. 对于半包或者粘包问题的另外一种处理方式:定义消息类型,消息长度,消息内容。根据消息的长度动态分配ByteBuffer的容量。TLV格式的消息或者LTV格式的消息。

处理读事件发生时,消息内容超出ByteBuffer容量限制的情况:

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

import static com.netty.c1.ByteBufferUtil.debugRead;


@Slf4j
public class MessageSideServer {

    public static void main(String[] args) throws IOException {
        // 1. 创建selector,管理多个channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        // 2. 建立selector与serversocketchannel之间的联系
        SelectionKey sscKey = ssc.register(selector, 0, null);
        SelectionKey selectionKey = sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key {}", selectionKey);

        ssc.bind(new InetSocketAddress(8080));
        while (true) {
            selector.select();
            Iterator iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(16);
                    SelectionKey scKey = socketChannel.register(selector, 0, byteBuffer);
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("key {}", scKey);
                } else if (key.isReadable()) {  // 当缓冲区放不下客户端发送的一条消息时候,会分多次读取,因此会多次触发读事件
                    try {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        int read = socketChannel.read(buffer);
                        if (read == -1) {  // 客户端正常断开
                            key.cancel();
                        } else {
                            if (buffer.position() == buffer.limit()) { // 当缓冲区中存放不下一条完整的消息时,需要对ByteBuffer缓冲区扩容
                                buffer.flip(); // 切换为读模式
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.limit() * 2);
                                newBuffer.put(buffer);
                                key.attach(newBuffer); // 让channel关联到新的ByteBuffer上
                            }
                            buffer.flip();
                            debugRead(buffer);
                        }
                    } catch (IOException e) {  // 客户端异常断开
                        e.printStackTrace();
                        key.cancel();
                    }

                }
            }
        }
    }
}

这里关键要理解attactment,首先要理解代码中 SelectionKey scKey = socketChannel.register(selector, 0, byteBuffer); register方法的第三个参数。这个参数是用于socketChannel发生读事件时,将相关的数据读入byteBuffer中。

写事件

服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

public class WriteServer {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        serverSocketChannel.bind(new InetSocketAddress(8080));

        while (true) {
            selector.select();
            Iterator iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 30000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    while (buffer.hasRemaining()) {
                        int write = socketChannel.write(buffer);
                        System.out.println(write);
                    }
                }
            }
        }
    }
}

客户端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class WriteClient {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));

        int count = 0;
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}

客户端,连接到服务端时,服务端在控制台的打印如下:

2588672
6220885
4190912
4125429
3994463
2226422
1113211
1047728
0
1047728
1047728
0
0
916762
0
0
1047728
0
0
0
0
0
0
0
0
432332

客户端控制台打印如下:

32768
65536
720896
1769472
2555904
2654155
3702731
4751307
5799883
6848459
7897035
8945611
9994187
11042763
12091339
13139915
14188491
15237067
16285643
17334219
18382795
19431371
20479947
21528523
22577099
23625675
24674251
25722827
26771403
27819979
28868555
29917131
30000000

从服务段的输出,我们可以看出一些问题:当写缓冲区满的时候,如果再往缓冲区写,就写不进去了,因此,会有0的情况返回,表示写入缓存区数据为0,并输出到控制台。但是由于buffer缓冲区里有数据存在,因此while循环会一直循环下去,直到写缓冲区有空间时,再次向缓存区写入数据。最终只有buffer缓存区的数据都写入完成后,while循环才会停止。

这里很明显的一个问题就是:当写缓冲区满的时候,无法写入缓冲区数据,但是while循环还是会继续执行,占用CPU。会导致当前线程无法去处理其他事件。也无法处理其他客户端的连接。

改进的服务端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;

public class WriteServer {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        serverSocketChannel.bind(new InetSocketAddress(8080));

        while (true) {
            selector.select();
            Iterator iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey sckey = socketChannel.register(selector, 0, null);
                    // socketChannel关联感兴趣的事件
                    sckey.interestOps(SelectionKey.OP_READ);

                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 30000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = socketChannel.write(buffer);
                    System.out.println(write);
                    if (buffer.hasRemaining()) {
                        // socketChannel上关联可写事件,并且不影响原来关注的事件
                        sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);
                        // 把未写完的buffer 挂到sckey上
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    int write = socketChannel.write(buffer);
                    System.out.println(write);

                    if (!buffer.hasRemaining()) {
                        key.attach(null);
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    }
}

主要改进的地方在于,如果数据一次性写不完,将原来的while循环写入,变成可写事件来处理。一旦缓存区可写时,便会写入。缓冲区不能写时,可以去处理其他的事件。而不是一直循环检测。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/684023.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号