栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty 学习之NIO学习

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty 学习之NIO学习

文章目录
  • Netty 学习NIO学习
    • 一 三大组件
      • 1.1 Channel
      • 1.2 Buffer
      • 1.3 Selector
        • 1.3.1 使用多线程技术
        • 1.3.2 使用线程池技术
        • 1.3.3 Selector 技术
    • 二 ByteBuffer
      • 2.1 基本知识了解
      • 2.2 基本结构
      • 2.3 基本方法
      • 2.4 Buffer基本结构
      • 2.5 Buffer方法
      • 2.6 粘包与半包
      • 2.7 简单使用
    • 三 Channel
      • 3.1 FileChannel
        • 3.1.1 基本知识
        • 3.1.2 基本结构
        • 3.1.2 基本方法
        • 3.1.4 基本适用
      • 3.2 ServerSocketChannel
        • 3.2.1 阻塞
        • 3.2.2 非阻塞
        • 3.2.3 基本结构
        • 3.2.4 基本方法
        • 3.2.5 AbstractSelectableChannel基本结构
        • 3.2.6 AbstractSelectableChannel 基本方法
        • 3.2.7 基本实现
    • 四 Selector
      • 4.1 Selector
        • 4.1.1 基本知识
        • 4.1.2 基本结构
        • 4.1.3 基本方法
      • 4.2 SelectionKey
        • 4.2.1 基本知识
        • 4.2.2 基本结构
        • 4.2.3 基本方法
      • 4.3 SelectorProvider
        • 4.3.1 基本知识
        • 4.3.2 基本结构
        • 4.3.3 基本方法
      • 4.4 基本用法
        • 4.4.1 Accpet事件
        • 4.4.2 Read事件
        • 4.4.3 Write事件
      • 4.5 多线程优化
    • 五 NIO与BIO
      • 5.1 Stream与Channel
      • 5.2 IO模型
        • 5.2.1 阻塞IO
        • 5.2.2 非阻塞IO
        • 5.2.3 多路复用
        • 5.2.4 异步IO

Netty 学习NIO学习 一 三大组件

Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理,简而言之,通道负责传输,缓冲区负责存储

1.1 Channel

1.2 Buffer

我们用的比较多的是ByteBuffer

1.3 Selector 1.3.1 使用多线程技术

  • 内存占用高,每个线程都需要占用一定的内存,当连接较多时,会开辟大量线程,导致占用大量内存
  • 线程上下文切换成本高
  • 只适合连接数少的场景,连接数过多,会导致创建很多线程,从而出现问题
1.3.2 使用线程池技术

  • 阻塞模式下,线程仅能处理一个连接,线程池中的线程获取任务(task)后,只有当其执行完任务之后(断开连接后),才会去获取并执行下一个任务

  • 若socke连接一直未断开,则其对应的线程无法处理其他socke连接

  • 短连接即建立连接发送请求并响应后就立即断开,使得线程池中的线程可以快速处理其他连接

    1.3.3 Selector 技术

  • selector 的作用就是配合一个线程来管理多个 channel(fileChannel因为是阻塞式的,所以无法使用selector),获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,当一个channel中没有执行任务时,可以去执行其他channel中的任务。适合连接数多,但流量较少的场景
  • 若事件未就绪,调用 selector 的 select() 方法会阻塞线程,直到 channel 发生了就绪事件。这些事件就绪后,select 方法就会返回这些事件交给 thread 来处理
二 ByteBuffer 2.1 基本知识了解
  • 读写单个字节的绝对和相对get和put方法。

  • 将连续字节序列从此缓冲区传输到数组中的相对bulk get方法。

  • bulk put字节数组或其他字节缓冲区中的连续字节序列传输到此缓冲区的相对bulk put方法。

  • 读取和写入其他原始类型的值的绝对和相对get和put方法,将它们转换为特定字节顺序的字节序列和从字节序列转换。

  • 创建视图缓冲区的方法,允许将字节缓冲区视为包含某些其他原始类型值的缓冲区; 和
    compacting 、 duplicating和slicing字节缓冲区的方法。

  • 字节缓冲区可以通过allocation (为缓冲区的内容分配空间)或通过wrapping现有字节数组wrapping到缓冲区中来创建。

2.2 基本结构
public abstract class ByteBuffer
    extends Buffer
    implements Comparable{
    //成员变量
    final byte[] hb;                
    final int offset;
    boolean isReadOnly; 
    
    //构造方法
    ByteBuffer(int mark, int pos, int lim, int cap,   // package-private
                 byte[] hb, int offset)
    {
        super(mark, pos, lim, cap);
        this.hb = hb;
        this.offset = offset;
    }
    //构造方法
    ByteBuffer(int mark, int pos, int lim, int cap) { // package-private
        this(mark, pos, lim, cap, null, 0);
    }
    }
2.3 基本方法
public static ByteBuffer allocateDirect(int capacity) {}
public static ByteBuffer allocate(int capacity) {}


public static ByteBuffer wrap(byte[] array,int offset, int length){}
public static ByteBuffer wrap(byte[] array) {}
2.4 Buffer基本结构
public abstract class Buffer {
    //mark <= position <= limit <= capacity
    //成员变量
 	private int mark = -1;
    //起始值
    private int position = 0;
    //最大值
    private int limit;
    //容量
    private int capacity;
    
     //构造器
    Buffer(int mark, int pos, int lim, int cap) {       // package-private
        if (cap < 0)
            throw new IllegalArgumentException("Negative capacity: " + cap);
        this.capacity = cap;
        limit(lim);
        position(pos);
        if (mark >= 0) {
            if (mark > pos)
                throw new IllegalArgumentException("mark > position: ("
                                                   + mark + " > " + pos + ")");
            this.mark = mark;
        }
    }
    
}
2.5 Buffer方法
public final Buffer limit(int newLimit) {}


public final Buffer position(int newPosition) {}


public final Buffer reset() {}


public final Buffer clear() {}



public final Buffer flip() {}


public final boolean hasRemaining() {}


public abstract boolean isReadOnly();


public final Buffer rewind() {}
2.6 粘包与半包
  • 粘包

    发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去

  • 半包
    接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象

2.7 简单使用
package com.shu.ByteBuffer;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;


public class TestByteBuffer {
    public static void main(String[] args) {
        // 获得FileChannel
        try (FileChannel channel = new FileInputStream("shu.txt").getChannel()) {
            // 获得缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(10);
            int hasNext = 0;
            StringBuilder builder = new StringBuilder();
            while((hasNext = channel.read(buffer)) > 0) {
                // 切换模式 limit=position, position=0
                buffer.flip();
                // 当buffer中还有数据时,获取其中的数据
                while(buffer.hasRemaining()) {
                    builder.append((char)buffer.get());
                }
                // 切换模式 position=0, limit=capacity
                buffer.clear();
            }
            System.out.println(builder.toString());
        } catch (IOException e) {
        }
    }
}

三 Channel 3.1 FileChannel

FileChannel只能在阻塞模式下工作,所以无法搭配Selector

3.1.1 基本知识
  • 用于读取、写入、映射和操作文件的通道。
  • 文件通道是连接到文件的SeekableByteChannel 。 它在其文件中有一个当前位置,可以queried和modified 。
  • 文件本身包含一个可变长度的字节序列,可以读取和写入,并且可以查询其当前size 。
  • 当写入的字节超过其当前大小时,文件的大小会增加; 当文件被truncated时,文件的大小会减小。
  • 文件还可能有一些相关的元数据,例如访问权限、内容类型和上次修改时间; 这个类没有定义元数据访问的方法。
  • 除了熟悉的字节通道读取、写入和关闭操作外,该类还定义了以下特定于文件的操作:
  • 可以以不影响通道当前位置的方式在文件中的绝对位置read或written字节。
  • 文件的一个区域可以直接mapped到内存中; 对于大文件,这通常比调用通常的读取或写入方法更有效。
  • 对文件所做的更新可能会被forced out到底层存储设备,以确保在系统崩溃时不会丢失数据。
  • 字节可以从文件传输to some other channel , vice versa ,这种方式可以被许多操作系统优化为非常快速的直接传输到文件系统缓存或从文件系统缓存传输。
  • 文件的某个区域可能会被locked防止其他程序访问。
3.1.2 基本结构
public abstract class FileChannel
    extends AbstractInterruptibleChannel
    implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel
{
    //构造器
    protected FileChannel() { }
}
3.1.2 基本方法
public static FileChannel open(Path path, Set options,FileAttribute... attrs)throws IOException{}
public static FileChannel open(Path path, OpenOption... options)
        throws IOException{}



 public abstract int read(ByteBuffer dst) throws IOException;

 public abstract long read(ByteBuffer[] dsts, int offset, int length)
        throws IOException;

 public final long read(ByteBuffer[] dsts) throws IOException {
        return read(dsts, 0, dsts.length);
    }




public abstract int write(ByteBuffer src) throws IOException;

public abstract long write(ByteBuffer[] srcs, int offset, int length)
        throws IOException;

public final long write(ByteBuffer[] srcs) throws IOException {
        return write(srcs, 0, srcs.length);
    }



public abstract long position() throws IOException;


public abstract long transferTo(long position, long count,
                                    WritableByteChannel target)
        throws IOException;


 public abstract void force(boolean metaData) throws IOException;
3.1.4 基本适用
public class TestChannel {
    public static void main(String[] args){
        try (FileInputStream fis = new FileInputStream("shu.txt");
             FileOutputStream fos = new FileOutputStream("student.txt");
             FileChannel inputChannel = fis.getChannel();
             FileChannel outputChannel = fos.getChannel()) {
            long size = inputChannel.size();
            long capacity = inputChannel.size();
            // 分多次传输
            while (capacity > 0) {
                // transferTo返回值为传输了的字节数
                capacity -= inputChannel.transferTo(size-capacity, capacity, outputChannel);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
3.2 ServerSocketChannel 3.2.1 阻塞
  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
    • SocketChannel.read 会在通道中没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
  • 但多线程下,有新的问题,体现在以下方面
    • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
3.2.2 非阻塞
  • 可以通过ServerSocketChannel的configureBlocking(**false**)方法将获得连接设置为非阻塞的。此时若没有连接,accept会返回null
  • 可以通过SocketChannel的configureBlocking(**false**)方法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read会返回-1
3.2.3 基本结构
public abstract class ServerSocketChannel extends AbstractSelectableChannel
 implements NetworkChannel{
    //构造器
     protected ServerSocketChannel(SelectorProvider provider) {
        super(provider);
    }
    }
3.2.4 基本方法
 public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }




 public final ServerSocketChannel bind(SocketAddress local)
        throws IOException
    {
        return bind(local, 0);
    }



 public abstract SocketChannel accept() throws IOException;

3.2.5 AbstractSelectableChannel基本结构
public abstract class AbstractSelectableChannel extends SelectableChannel{
    
     // The provider that created this channel
    private final SelectorProvider provider;

    // Keys that have been created by registering this channel with selectors.
    // They are saved because if this channel is closed the keys must be
    // deregistered.  Protected by keyLock.
    //
    private SelectionKey[] keys = null;
    private int keyCount = 0;

    // Lock for key set and count
    private final Object keyLock = new Object();

    // Lock for registration and configureBlocking operations
    private final Object regLock = new Object();
	
    // Blocking mode, protected by regLock
    boolean blocking = true;
    
    
    protected AbstractSelectableChannel(SelectorProvider provider) {
        this.provider = provider;
    }
    }
3.2.6 AbstractSelectableChannel 基本方法
 public final SelectableChannel configureBlocking(boolean block)
        throws IOException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if (blocking == block)
                return this;
            if (block && havevalidKeys())
                throw new IllegalBlockingModeException();
            implConfigureBlocking(block);
            blocking = block;
        }
        return this;
    }
    


 public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        throws ClosedChannelException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if ((ops & ~validOps()) != 0)
                throw new IllegalArgumentException();
            if (blocking)
                throw new IllegalBlockingModeException();
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }
3.2.7 基本实现
public class Server {
    public static void main(String[] args) {
        // 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 获得服务器通道
        try(ServerSocketChannel server = ServerSocketChannel.open()) {
            // 为服务器通道绑定端口
            server.bind(new InetSocketAddress(8080));
            // 用户存放连接的集合
            ArrayList channels = new ArrayList<>();
            // 循环接收连接
            while (true) {
                // 设置为非阻塞模式,没有连接时返回null,不会阻塞线程
                server.configureBlocking(false);
                SocketChannel socketChannel = server.accept();
                // 通道不为空时才将连接放入到集合中
                if (socketChannel != null) {
                    System.out.println("after connecting...");
                    channels.add(socketChannel);
                }
                // 循环遍历集合中的连接
                for(SocketChannel channel : channels) {
                    // 处理通道中的数据
                    // 设置为非阻塞模式,若通道中没有数据,会返回0,不会阻塞线程
                    channel.configureBlocking(false);
                    int read = channel.read(buffer);
                    if(read > 0) {
                        buffer.flip();
                        ByteBufferUtil.debugRead(buffer);
                        buffer.clear();
                        System.out.println("after reading");
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


public class Client {
    public static void main(String[] args) {
        try (SocketChannel socketChannel = SocketChannel.open()) {
            // 建立连接
            socketChannel.connect(new InetSocketAddress("localhost", 8080));
            System.out.println("waiting...");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
四 Selector

Selector一般称为选择器,也可以翻译为多路复用器,是Java NIO核心组件之一,主要功能是用于检查一个或者多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个Channel(通道),当然也可以管理多个网络连接。

4.1 Selector 4.1.1 基本知识
  • 将通道设置为非阻塞模式,并注册到选择器中,并设置感兴趣的事件

  • channel 必须工作在非阻塞模式

  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用

  • connect - 客户端连接成功时触发

  • accept - 服务器端成功接受连接时触发

  • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况

  • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

  • SelectableChannel对象的多路复用器。可以通过调用该类的open方法来创建选择器,该方法将使用系统默认的selector provider来创建新的选择器。

  • 也可以通过调用自定义选择器提供者的openSelector方法来创建选择器。 选择器保持打开状态,直到通过其close方法关闭为止。

4.1.2 基本结构

public abstract class Selector implements Closeable {
     protected Selector() { }
}
4.1.3 基本方法
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}


 public abstract Selector wakeup();


public abstract void close() throws IOException;


public abstract int select() throws IOException;


public abstract Set selectedKeys();
4.2 SelectionKey 4.2.1 基本知识
  • 表示SelectableChannel注册到Selector令牌。
  • 每次使用选择器注册频道时,都会创建一个选择键。
  • 直到它被调用其取消一个关键保持有效cancel方法,通过关闭它的信道,或通过关闭它的选择器。
  • 取消一个键不会立即将它从它的选择器中删除; 它会被添加到选择器的取消键集中,以便在下一次选择操作期间移除。
  • 可以通过调用其isValid方法来测试密钥的有效性。
4.2.2 基本结构

public abstract class SelectionKey {
    protected SelectionKey() { }
}
4.2.3 基本方法
 public static final int OP_READ = 1 << 0;
 public static final int OP_WRITE = 1 << 2;
 public static final int OP_ConNECT = 1 << 3;
 public static final int OP_ACCEPT = 1 << 4;


 public abstract boolean isValid();



 public abstract void cancel();



 public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
}


 public final boolean isWritable() {
        return (readyOps() & OP_WRITE) != 0;
    }


public final boolean isConnectable() {
        return (readyOps() & OP_CONNECT) != 0;
    }



 public final boolean isAcceptable() {
        return (readyOps() & OP_ACCEPT) != 0;
    }
4.3 SelectorProvider 4.3.1 基本知识
  • 选择器和可选通道的服务提供者类。
  • 选择器提供程序是此类的具体子类,它具有零参数构造函数并实现下面指定的抽象方法。
  • Java 虚拟机的给定调用维护一个系统范围的默认提供程序实例,该实例由provider方法返回。 该方法的第一次调用将定位如下指定的默认提供程序。
  • 系统范围的默认提供程序由DatagramChannel 、 Pipe 、 Selector 、 ServerSocketChannel和SocketChannel类的静态打开方法使用。
  • System.inheritedChannel()方法也使用它。
  • 程序可以通过实例化该提供程序然后直接调用此类中定义的开放方法来使用默认提供程序以外的提供程序。
  • 此类中的所有方法对于多个并发线程使用都是安全的。
4.3.2 基本结构
public abstract class SelectorProvider {

    private static final Object lock = new Object();
    private static SelectorProvider provider = null;
    
    protected SelectorProvider() {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null)
            sm.checkPermission(new RuntimePermission("selectorProvider"));
    }
    }
4.3.3 基本方法
public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

  public abstract DatagramChannel openDatagramChannel(ProtocolFamily family)
        throws IOException;


 public abstract ServerSocketChannel openServerSocketChannel()
        throws IOException;


 public abstract SocketChannel openSocketChannel()
        throws IOException;



   public Channel inheritedChannel() throws IOException {
        return null;
   }
4.4 基本用法 4.4.1 Accpet事件
public class SelectServer {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 获得服务器通道
        try(ServerSocketChannel server = ServerSocketChannel.open()) {
            server.bind(new InetSocketAddress(8080));
            // 创建选择器
            Selector selector = Selector.open();
            
            // 通道必须设置为非阻塞模式
            server.configureBlocking(false);
            // 将通道注册到选择器中,并设置感兴趣的事件
            server.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                // 若没有事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转
                // 返回值为就绪的事件个数
                int ready = selector.select();
                System.out.println("selector ready counts : " + ready);
                
                // 获取所有事件
                Set selectionKeys = selector.selectedKeys();
                
                // 使用迭代器遍历事件
                Iterator iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    
                    // 判断key的类型
                    if(key.isAcceptable()) {
                        // 获得key对应的channel
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        System.out.println("before accepting...");
                        
        				// 获取连接并处理,而且是必须处理,否则需要取消
                        SocketChannel socketChannel = channel.accept();
                        System.out.println("after accepting...");
                        
                        // 处理完毕后移除
                        iterator.remove();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
4.4.2 Read事件
public class SelectServer {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 获得服务器通道
        try(ServerSocketChannel server = ServerSocketChannel.open()) {
            server.bind(new InetSocketAddress(8080));
            // 创建选择器
            Selector selector = Selector.open();
            // 通道必须设置为非阻塞模式
            server.configureBlocking(false);
            // 将通道注册到选择器中,并设置感兴趣的实践
            server.register(selector, SelectionKey.OP_ACCEPT);
            // 为serverKey设置感兴趣的事件
            while (true) {
                // 若没有事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转
                // 返回值为就绪的事件个数
                int ready = selector.select();
                System.out.println("selector ready counts : " + ready);
                // 获取所有事件
                Set selectionKeys = selector.selectedKeys();
                // 使用迭代器遍历事件
                Iterator iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    // 判断key的类型
                    if(key.isAcceptable()) {
                        // 获得key对应的channel
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        System.out.println("before accepting...");
                        // 获取连接
                        SocketChannel socketChannel = channel.accept();
                        System.out.println("after accepting...");
                        // 设置为非阻塞模式,同时将连接的通道也注册到选择其中
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        // 处理完毕后移除
                        iterator.remove();
                    } else if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        System.out.println("before reading...");
                        channel.read(buffer);
                        System.out.println("after reading...");
                        buffer.flip();
                        ByteBufferUtil.debugRead(buffer);
                        buffer.clear();
                        // 处理完毕后移除
                        iterator.remove();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
4.4.3 Write事件
public class WriteServer {
    public static void main(String[] args) {
        try(ServerSocketChannel server = ServerSocketChannel.open()) {
            server.bind(new InetSocketAddress(8080));
            server.configureBlocking(false);
            Selector selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                selector.select();
                Set selectionKeys = selector.selectedKeys();
                Iterator iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    // 处理后就移除事件
                    iterator.remove();
                    if (key.isAcceptable()) {
                        // 获得客户端的通道
                        SocketChannel socket = server.accept();
                        // 写入数据
                        StringBuilder builder = new StringBuilder();
                        for(int i = 0; i < 500000000; i++) {
                            builder.append("a");
                        }
                        ByteBuffer buffer = StandardCharsets.UTF_8.encode(builder.toString());
                        // 先执行一次Buffer->Channel的写入,如果未写完,就添加一个可写事件
                        int write = socket.write(buffer);
                        System.out.println(write);
                        // 通道中可能无法放入缓冲区中的所有数据
                        if (buffer.hasRemaining()) {
                            // 注册到Selector中,关注可写事件,并将buffer添加到key的附件中
                            socket.configureBlocking(false);
                            socket.register(selector, SelectionKey.OP_WRITE, buffer);
                        }
                    } else if (key.isWritable()) {
                        SocketChannel socket = (SocketChannel) key.channel();
                        // 获得buffer
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        // 执行写操作
                        int write = socket.write(buffer);
                        System.out.println(write);
                        // 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣
                        if (!buffer.hasRemaining()) {
                            key.attach(null);
                            key.interestOps(0);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
4.5 多线程优化
  • 思路:各个线程各司其职,像医院的看病流程一样,各个单位各司其职
public class ThreadsServer {
    public static void main(String[] args) {
        try (ServerSocketChannel server = ServerSocketChannel.open()) {
            // 当前线程为Boss线程
            Thread.currentThread().setName("Boss");
            server.bind(new InetSocketAddress(8080));
            // 负责轮询Accept事件的Selector
            Selector boss = Selector.open();
            server.configureBlocking(false);
            server.register(boss, SelectionKey.OP_ACCEPT);
            // 创建固定数量的Worker
            Worker[] workers = new Worker[4];
            // 用于负载均衡的原子整数
            AtomicInteger robin = new AtomicInteger(0);
            for(int i = 0; i < workers.length; i++) {
                workers[i] = new Worker("worker-"+i);
            }
            while (true) {
                boss.select();
                Set selectionKeys = boss.selectedKeys();
                Iterator iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    // BossSelector负责Accept事件
                    if (key.isAcceptable()) {
                        // 建立连接
                        SocketChannel socket = server.accept();
                        System.out.println("connected...");
                        socket.configureBlocking(false);
                        // socket注册到Worker的Selector中
                        System.out.println("before read...");
                        // 负载均衡,轮询分配Worker
                        workers[robin.getAndIncrement()% workers.length].register(socket);
                        System.out.println("after read...");
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class Worker implements Runnable {
        private Thread thread;
        private volatile Selector selector;
        private String name;
        private volatile boolean started = false;
        
        private ConcurrentlinkedQueue queue;

        public Worker(String name) {
            this.name = name;
        }

        public void register(final SocketChannel socket) throws IOException {
            // 只启动一次
            if (!started) {
                thread = new Thread(this, name);
                selector = Selector.open();
                queue = new ConcurrentlinkedQueue<>();
                thread.start();
                started = true;
            }
            
            // 向同步队列中添加SocketChannel的注册事件
            // 在Worker线程中执行注册事件
            queue.add(new Runnable() {
                @Override
                public void run() {
                    try {
                        socket.register(selector, SelectionKey.OP_READ);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
            // 唤醒被阻塞的Selector
            // select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark
            selector.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    // 通过同步队列获得任务并运行
                    Runnable task = queue.poll();
                    if (task != null) {
                        // 获得任务,执行注册操作
                        task.run();
                    }
                    Set selectionKeys = selector.selectedKeys();
                    Iterator iterator = selectionKeys.iterator();
                    while(iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        // Worker只负责Read事件
                        if (key.isReadable()) {
                            // 简化处理,省略细节
                            SocketChannel socket = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            socket.read(buffer);
                            buffer.flip();
                            ByteBufferUtil.debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
五 NIO与BIO 5.1 Stream与Channel
  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行
    • 虽然Stream是单向流动的,但是它也是全双工的
5.2 IO模型
  • 同步

    :线程自己去获取结果(一个线程)

    • 例如:线程调用一个方法后,需要等待方法返回结果
  • 异步

    :线程自己不去获取结果,而是由其它线程返回结果(至少两个线程)

    • 例如:线程A调用一个方法后,继续向下运行,运行结果由线程B返回

当调用一次 channel.read 或 stream.read 后,会由用户态切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段

  • 复制数据阶段

根据UNIX 网络编程 - 卷 I,IO模型主要有以下几种

5.2.1 阻塞IO

  • 用户线程进行read操作时,需要等待操作系统执行实际的read操作,此期间用户线程是被阻塞的,无法执行其他操作
5.2.2 非阻塞IO

  • 用户线程

    在一个循环中一直调用read方法

    ,若内核空间中还没有数据可读,立即返回

    • 只是在等待阶段非阻塞
  • 用户线程发现内核空间中有数据后,等待内核空间执行复制数据,待复制结束后返回结果

5.2.3 多路复用

Java中通过Selector实现多路复用

  • 当没有事件是,调用select方法会被阻塞住
  • 一旦有一个或多个事件发生后,就会处理对应的事件,从而实现多路复用

多路复用与阻塞IO的区别

  • 阻塞IO模式下,若线程因accept事件被阻塞,发生read事件后,仍需等待accept事件执行完成后,才能去处理read事件
  • 多路复用模式下,一个事件发生后,若另一个事件处于阻塞状态,不会影响该事件的执行
5.2.4 异步IO

  • 线程1调用方法后理解返回,不会被阻塞也不需要立即获取结果
  • 当方法的运行结果出来以后,由线程2将结果返回给线程1

参考博客:https://nyimac.gitee.io

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/488916.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号