栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Netty系列(1)Java NIO快速入门,一看就懂

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Netty系列(1)Java NIO快速入门,一看就懂

文章目录
  • 1. NIO介绍
    • 1.1 NIO 三大核心原理示意图
  • 2.缓冲区(Buffer)
    • 2.1 Buffer常用API介绍
      • 2.1.1 缓冲区对象创建
      • 2.1.2 缓冲区对象添加数据
      • 2.1.3 缓冲区对象读取数据
  • 3 Channel
    • 3.1 Channel常用实现类
    • 3.2 FileChannel
      • 3.2.1 FileChannel本地文件写数据
      • 3.2.2 FileChannel读取本地文件
      • 3.2.3 使用FileChannel进行文件copy
      • 3.2.4 使用transferFrom方法完成copy
      • 3.2.5 直接在内存中修改文件
    • 3.3 ServerSocketChannel和SocketChannel
  • 4 Selector
    • 4.1 Selector常用方法
    • 4.2 NIO 非阻塞网络编程原理
    • 4.3 NIO入门案例,实现服务端与客户端通信
  • 5 NIO网络编程应用实例-简单版群聊系统
    • 5.1 需求
    • 5.2 服务端代码
    • 5.3 客户端代码

1. NIO介绍

Java NIO 全称java non-blocking IO ,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系
列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的。

  1. NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
  2. NIO是 面向缓冲区编程的。数据读取到一个缓冲区中,需要时可在缓冲区中前后移动,这就增加了 处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
  3. Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的 数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可 以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某 通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情。通俗理解:NIO 是可以做到 用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配50 或者 100 个 线程来处理。不像之前的阻塞 IO 那样,非得分配 10000
1.1 NIO 三大核心原理示意图

  • 每个 channel 都会对应一个 Buffer
  • Selector 对应一个线程, 一个线程对应多个 channel(连接)
  • 每个 channel 都注册到 Selector选择器上
  • Selector不断轮询查看Channel上的事件, 事件是通道Channel非常重要的概念
  • Selector 会根据不同的事件,完成不同的处理操作
  • Buffer 就是一个内存块 , 底层是有一个数组
  • 数据的读取写入是通过 Buffer, 这个和 BIO , BIO 中要么是输入流,或者是输出流, 不能双向,但是
  • NIO 的 Buffer 是可以读也可以写 , channel 是双向的
2.缓冲区(Buffer)

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个数组,该对象
提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的
状态变化情况。Channel 提供从网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer.

2.1 Buffer常用API介绍

在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,常用的缓冲区分别对应 byte,short, int, long,float,double,char 7种。

2.1.1 缓冲区对象创建
  • public static ByteBuffer allocate(int capacity) 创建byte类型的指定长度的缓冲区
  • public static ByteBuffer wrap(byte[] array)创建一个有内容的byte类型缓冲区
//创建一个指定长度的缓冲区
ByteBuffer buffer=ByteBuffer.allocate(10);


// 创建一个有内容的byte类型缓冲区
ByteBuffer wrap = ByteBuffer.wrap("hello word".getBytes());
2.1.2 缓冲区对象添加数据
  • 常用方法

    • int position()/position(int newPosition) 获得当前要操作的索引/修改当前要操作的索引位 置
    • int limit()/limit(int newLimit) 最多能操作到哪个索引或者修改最多能操作的索引位 置
    • int capacity() 返回缓冲区的总长度
    • int remaining() 还有多少能操作索引个数
    • put(byte b)/put(byte[] src) 添加一个字节/添加字节数组
  • buffer添加数据原理图解

  • 代码演示

    public static void main(String[] args) {
    
        //1.创建一个指定长度的缓冲区,
        ByteBuffer byteBuffer = ByteBuffer.allocate(10);
        System.out.println(byteBuffer.position());//0 获取当前索引所在位置
        System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
        System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
        System.out.println(byteBuffer.remaining());//10 还有多少个能操作
          //添加一个字节
        byteBuffer.put((byte) 97);
        System.out.println(byteBuffer.position());//1 获取当前索引所在位置
        System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
        System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
        System.out.println(byteBuffer.remaining());//9 还有多少个能操作
       //添加一个字节数组
        byteBuffer.put("abc".getBytes());
        System.out.println(byteBuffer.position());//4 获取当前索引所在位置
        System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
        System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
        System.out.println(byteBuffer.remaining());//6 还有多少个能操作
        //当添加超过缓冲区的长度时会报错
        byteBuffer.put("012345".getBytes());
        System.out.println(byteBuffer.position());//10 获取当前索引所在位置
        System.out.println(byteBuffer.limit());//10 最多能操作到哪个索引
        System.out.println(byteBuffer.capacity());//10 返回缓冲区总长度
        System.out.println(byteBuffer.remaining());//0 还有多少个能操作
        System.out.println(byteBuffer.hasRemaining());// false 是否还能有操作的数组
       // 如果缓存区存满后, 可以调整position位置可以重复写,这样会覆盖之前存入索引的对应的值
        byteBuffer.position(0);
        byteBuffer.put("012345".getBytes());
    
    
    }
    
2.1.3 缓冲区对象读取数据
  • 常用方法

    • public final Buffer flip() 写切换读模式 limit设置position位置, position设置0
    • public byte get() 读一个字节
    • get(byte[] dst) 读多个字节
    • get(int index) 读指定索引的字节
    • rewind() 将position设置为0,可以重复读
    • clear() 切换写模式 position设置为0 , limit 设置为 capacity
    • array() 将缓冲区转换成字节数组返回
  • flip()方法原理图解

  • clear方法图解

  • 代码示例

    public static void main(String[] args) {
        //1.创建一个指定长度的缓冲区
        ByteBuffer allocate = ByteBuffer.allocate(10);
        allocate.put("0123".getBytes());
        System.out.println("position:" + allocate.position());//4
        System.out.println("limit:" + allocate.limit());//10
        System.out.println("capacity:" + allocate.capacity());//10
        System.out.println("remaining:" + allocate.remaining());//6
        //切换读模式
        System.out.println("读取数据--------------");
        allocate.flip();
        System.out.println("position:" + allocate.position());//4
        System.out.println("limit:" + allocate.limit());//10
        System.out.println("capacity:" + allocate.capacity());//10
        System.out.println("remaining:" + allocate.remaining());//6
        for (int i = 0; i < allocate.limit(); i++) {
            System.out.println(allocate.get());
        }
        //读取完毕后.继续读取会报错,超过limit值
        // System.out.println(allocate.get());
       //读取指定索引字节
        System.out.println("读取指定索引字节--------------");
        System.out.println(allocate.get(1));
        System.out.println("读取多个字节--------------");
       // 重复读取
        allocate.rewind();
        byte[] bytes = new byte[4];
        allocate.get(bytes);
        System.out.println(new String(bytes));
        // 将缓冲区转化字节数组返回
        System.out.println("将缓冲区转化字节数组返回--------------");
        byte[] array = allocate.array();
        System.out.println(new String(array));
         // 切换写模式,覆盖之前索引所在位置的值
        System.out.println("写模式--------------");
        allocate.clear();
        allocate.put("abc".getBytes());
        System.out.println(new String(allocate.array()));
    
    }
    
3 Channel

通常来说NIO中的所有IO都是从 Channel(通道) 开始的。NIO 的通道类似于流,但有些区别如下:

  1. 通道可以读也可以写,流一般来说是单向的(只能读或者写,所以之前我们用流进行IO操作的时候 需要分别创建一个输入流和一个输出流);
  2. 通道可以异步读写;
  3. 通道总是基于缓冲区Buffer来读写。
3.1 Channel常用实现类

常 用 的Channel实现类类 有 :FileChannel , DatagramChannel ,ServerSocketChannel和 SocketChannel 。

  • FileChannel 用于文件的数据读写,

  • DatagramChannel 用于 UDP 的数据读 写,

  • ServerSocketChannel 和SocketChannel 用于 TCP 的数据读写。

3.2 FileChannel

FileChannel 主要用来对本地文件进行 IO 操作,常见的方法有:

  • public int read(ByteBuffer dst),从通道读取数据并放到缓冲区中
  • public int write(ByteBuffer src),把缓冲区的数据写到通道中
  • public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道中复制数据到当前通道
  • public long transferTo(long position, long count, WritableByteChannel target),把数据从当前通道复制给目标通道
  • public abstract MappedByteBuffer map(MapMode mode,long position, long size) 可以让文件直接在内存中被修改,操作系统不需要在拷贝一份
3.2.1 FileChannel本地文件写数据
static void write() throws IOException {
    String str="hello word word";
    FileOutputStream fileOutputStream=new FileOutputStream("1.txt");
    FileChannel channel = fileOutputStream.getChannel();
    ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
    byteBuffer.put(str.getBytes(StandardCharsets.UTF_8));
    byteBuffer.flip();
    //channel.write(ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)));
    channel.write(byteBuffer);
    fileOutputStream.close();
}
3.2.2 FileChannel读取本地文件
static void read() throws IOException{
    FileInputStream fileInputStream=new FileInputStream("d:\1.txt");
    FileChannel channel = fileInputStream.getChannel();
    ByteBuffer byteBuffer=ByteBuffer.allocate(1024);

    int read=channel.read(byteBuffer);

    System.out.println(new String(byteBuffer.array(),0,read));

    fileInputStream.close();
}
3.2.3 使用FileChannel进行文件copy
    static void copy() throws IOException{

        FileOutputStream fileOutputStream=new FileOutputStream("2.txt");
        FileInputStream fileInputStream=new FileInputStream("1.txt");
        FileChannel channelRead = fileInputStream.getChannel();
        FileChannel channelWrite = fileOutputStream.getChannel();
        ByteBuffer byteBuffer=ByteBuffer.allocate(50);
        while (true){
            //重置 buffer
            byteBuffer.clear();
            int read = channelRead.read(byteBuffer);
            if (read==-1){
                break;
            }
            byteBuffer.flip();
            channelWrite.write(byteBuffer);
        }
        fileInputStream.close();
        fileOutputStream.close();
    }
3.2.4 使用transferFrom方法完成copy
static void copyTransferFrom() throws IOException{
    FileOutputStream fileOutputStream=new FileOutputStream("3.txt");

    FileInputStream fileInputStream=new FileInputStream("1.txt");

    FileChannel sourceChannel = fileInputStream.getChannel();

    FileChannel distWrite = fileOutputStream.getChannel();

    distWrite.transferFrom(sourceChannel,0,sourceChannel.size());
}
3.2.5 直接在内存中修改文件
    
    static void mappedByteBufferTest()throws Exception{
        RandomAccessFile randomAccessFile=new RandomAccessFile("1.txt","rw");
        FileChannel channel = randomAccessFile.getChannel();
        
        MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
        //把第0个字符修改为H
        mappedByteBuffer.put(0,(byte) 'H');
        //关闭文件流
        randomAccessFile.close();
    }
3.3 ServerSocketChannel和SocketChannel

使用ServerSocketChannel和SocketChannel实现服务端和客户端通信

  1. ServerSocketChannel 服务端实现

    package com.warybee.channel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.StandardCharsets;
    
    
    public class NIOServerSocketChannelServer {
        
        public static void main(String[] args) throws IOException, InterruptedException {
            //1、打开一个服务端通道
            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
            //2、绑定对应的端口号
            serverSocketChannel.bind(new InetSocketAddress(9999));
            //3. 通道默认是阻塞的,需要设置为非阻塞   true 为通道阻塞 false 为非阻塞
            serverSocketChannel.configureBlocking(false);
            System.out.println("服务端启动成功..........");
            //4. 检查是否有客户端连接 有客户端连接会返回对应的通道
            while (true){
                SocketChannel accept = serverSocketChannel.accept();
                if (accept==null){
                    System.out.println("没有客户端连接...我去做别的事情");
                    Thread.sleep(2000);
                    continue;
                }
                //5. 获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
                ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                //返回值:
                //正数: 表示本次读到的有效字节个数.
                 //0 : 表示本次没有读到有效字节.
                 //-1 : 表示读到了末尾
                int read=accept.read(byteBuffer);
                System.out.println(new String(byteBuffer.array(),0,read, StandardCharsets.UTF_8));
                //6. 给客户端回写数据
                accept.write(ByteBuffer.wrap("不要一直固执下去好吗".getBytes(StandardCharsets.UTF_8)));
                //7. 释放资源
                accept.close();
            }
    
        }
    }
    
  2. SocketChannel客户端实现

    package com.warybee.channel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.StandardCharsets;
    
    
    public class NIOClient {
    
        
        public static void main(String[] args) throws IOException {
            //1. 打开通道
            SocketChannel socketChannel=SocketChannel.open();
            //2、设置连接IP和端口号
            socketChannel.connect(new InetSocketAddress("127.0.0.1",9999));
            //3、写出数据
            socketChannel.write(ByteBuffer.wrap("我要给你一百元".getBytes(StandardCharsets.UTF_8)));
    
            //4.读取服务器写回的数据
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            int read=socketChannel.read(readBuffer);
            System.out.println("服务端消息:" + new String(readBuffer.array(), 0, read,
                    StandardCharsets.UTF_8));
    
            //5、释放资源
            socketChannel.close();
    
    
        }
    }
    
4 Selector

可以用一个线程,处理多个的客户端连接,就会使用到NIO的Selector(选择器). Selector 能够检测 多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的 处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。

在这种没有选择器的情况下,对应每个连接对应一个处理线程. 但是连接并不能马上就会发送信息,所以还 会产生资源浪费

只有在通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都 创建一个线程,不用去维护多个线程, 避免了多线程之间的上下文切换导致的开销

4.1 Selector常用方法
  • public static Selector open() 得到一个选择器对象
  • public abstract int select(long timeout) 监控所有注册的通道,当其有IO操作可以进行时,将对应的SelectionKey加入到集合内部并返回。参数用来设置其超时时间 。阻塞方法
  • public abstract Set selectedKeys() 返回SelectionKey的集合
  • public abstract Selector wakeup() 唤醒Selector
  • public abstract int selectNow() 不阻塞,立马返回

SelectionKey

SelectionKey,表示 Selector 和网络通道的注册关系

  • 常用方法

    • SelectionKey.isAcceptable(): 是否是连接继续事件
    • SelectionKey.isConnectable(): 是否是连接就绪事件
    • SelectionKey.isReadable(): 是否是读就绪事件
    • SelectionKey.isWritable(): 是否是写就绪事件
    • public abstract Selector selector();得到与之关联的Selector
    • public abstract SelectableChannel channel(); 得到与之关联的Channel
    • public final Object attachment() 得到与之关联的共享数据
    • public abstract SelectionKey interestOps(int ops); 设置或改变监听事件
  • SelectionKey中定义的4种事件

    • SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器 可以接收这个连接了

    • SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户端与服务器的连接已经建立成功

    • SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操 作了(通道目前有数据,可以进行读操作了)

    • SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用 于写操作)

4.2 NIO 非阻塞网络编程原理

NIO 非阻塞网络编程相关的(Selector、SelectionKey、ServerScoketChannel 和 SocketChannel)关系图

  1. 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel。
  2. Selector 进行监听 select 方法,返回有事件发生的通道的个数。
  3. 将 socketChannel 注册到 Selector 上,register(Selector sel, int ops),一个 Selector 上可以注册多个 SocketChannel。
  4. 注册后返回一个 SelectionKey,会和该 Selector 关联(集合)。
  5. 进一步得到各个 SelectionKey(有事件发生)。
  6. 在通过 SelectionKey 反向获取 SocketChannel,方法 channel()。
  7. 可以通过得到的 channel,完成业务处理。
4.3 NIO入门案例,实现服务端与客户端通信
  • 服务端

    public static void main(String[] args) throws IOException {
        //1. 打开一个服务端通道
        ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
        //2. 绑定对应的端口号
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //3. 通道默认是阻塞的,需要设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 4. 创建选择器
        Selector selector = Selector.open();
        //5. 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true){
            //6.阻塞1秒,检查选择器是否有事件,没有就返回
            int select = selector.select(1000);
            if (select==0){
                System.out.println("服务端等待了1秒,没有事件发生");
                continue;
            }
            //7. 获取事件集合
            Set selectionKeys = selector.selectedKeys();
            Iterator iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                //8. 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
                if (key.isAcceptable()){
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客户端已连接....."+socketChannel);
                    必须设置通道为非阻塞, 因为selector需要轮询监听每个通道的事件
                    socketChannel.configureBlocking(false);
                    //9. 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
                    socketChannel.register(selector,SelectionKey.OP_READ);
    
                }
                //10. 判断是否是客户端读就绪事件SelectionKey.isReadable()
                if (key.isReadable()){
                    //11. 得到客户端通道,读取数据到缓冲区
                   SocketChannel socketChannel= (SocketChannel)key.channel();
                    ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                    int read = socketChannel.read(byteBuffer);
                    if (read>0){
                        System.out.println("客户端消息:"+
                                new String(byteBuffer.array(),0,read, StandardCharsets.UTF_8));
                        //12.给客户端回写数据
                        socketChannel.write(ByteBuffer.wrap("没钱".getBytes(StandardCharsets.UTF_8)));
                        socketChannel.close();
                    }
                }
                //13. 从集合中删除对应的事件, 因为防止二次处理.
                iterator.remove();
            }
        }
    }
    
  • 客户端

    public static void main(String[] args) throws IOException {
        //1. 打开通道
        SocketChannel socketChannel=SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //2、设置连接IP和端口号
        socketChannel.connect(new InetSocketAddress("127.0.0.1",9999));
        //3、写出数据
        socketChannel.write(ByteBuffer.wrap("我要给你借一百元".getBytes(StandardCharsets.UTF_8)));
    
        //4.读取服务器写回的数据
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        int read=socketChannel.read(readBuffer);
        System.out.println("服务端消息:" + new String(readBuffer.array(), 0, read,
                StandardCharsets.UTF_8));
    
        //5、释放资源
        socketChannel.close();
    
    
    }
    
5 NIO网络编程应用实例-简单版群聊系统 5.1 需求
  • 实现多人群聊
  • 服务器端:可以监测用户上线,离线,并实现消息转发功能
  • 客户端:通过 Channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
5.2 服务端代码
package com.warybee.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;


public class GroupChatServer {

     private static final int PORT=9999;

     private Selector selector;
     private ServerSocketChannel listenChannel;

     public GroupChatServer(){
         try {
             //获取一个Selector
             selector=selector.open();
             //打开一个服务端通道
             listenChannel=listenChannel.open();
             //绑定一个端口
             listenChannel.bind(new InetSocketAddress(PORT));
             listenChannel.configureBlocking(false);
             //注册到selector
             listenChannel.register(selector, SelectionKey.OP_ACCEPT);

         } catch (IOException e) {
             e.printStackTrace();
         }
     }

    
    public void listen(){
          while (true){
              try {
                  //阻塞一秒后是否有链接事件
                  int select = selector.select(1000);
                  if (select==0){
                      System.out.println("等到客户端链接...!");
                      continue;
                  }
                  Set selectionKeys = selector.selectedKeys();
                  Iterator iterator = selectionKeys.iterator();
                  while (iterator.hasNext()){
                      SelectionKey key = iterator.next();
                      //有链接状态
                      if (key.isAcceptable()){
                          SocketChannel socketChannel = listenChannel.accept();
                          //设置为非阻塞
                          socketChannel.configureBlocking(false);
                          //注册到selector
                          socketChannel.register(selector,SelectionKey.OP_READ);
                          System.out.println(socketChannel.getRemoteAddress()+" 上线");
                      }
                      //通道发送read事件,即通道是可读的状态
                      if (key.isReadable()){
                          readData(key);
                      }
                      //当前的 key 删除,防止重复处理
                      iterator.remove();
                  }
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }
     }

    
     private void readData(SelectionKey key){
         SocketChannel socketChannel=null;
         try {
             socketChannel= (SocketChannel)key.channel();
             ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
             int read = socketChannel.read(byteBuffer);
             //客户端消息
             String msg=new String(byteBuffer.array(),0,read, StandardCharsets.UTF_8);
             System.out.println("from 客户端:"+msg);
             //把消息转发到其他客户端
             sendMsgOtherClient(msg,socketChannel);
         } catch (IOException e) {
             try {
                 System.out.println(socketChannel.getRemoteAddress() + "离线了..");
                 //取消注册
                 key.cancel();
                 //关闭通道
                 socketChannel.close();
             } catch (IOException e2) {
                 e2.printStackTrace();
             }
         }
     }

    
    private void sendMsgOtherClient(String msg,SocketChannel self) throws IOException {
        //获取所有链接通道
        Iterator iterator = selector.keys().iterator();
        while (iterator.hasNext()){
            SelectionKey key = iterator.next();
            Channel channel = key.channel();
            if (channel instanceof SocketChannel&&channel!=self){
                //转型
                SocketChannel dest = (SocketChannel) channel;
                dest.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
            }
        }
    }

    public static void main(String[] args) {
          GroupChatServer groupChatServer=new GroupChatServer();
          groupChatServer.listen();
    }
}
5.3 客户端代码
package com.warybee.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;


public class GroupChatClient {

       private static final String HOST="127.0.0.1";
       private static final int PORT=9999;
       private Selector selector;
       private SocketChannel socketChannel;
       private String username;
       public GroupChatClient(){
           try {
               selector=selector.open();
               socketChannel=socketChannel.open(new InetSocketAddress(HOST,PORT));
               //设置非阻塞
               socketChannel.configureBlocking(false);
               //注册到selector
               socketChannel.register(selector, SelectionKey.OP_READ);
               //得到 username
               username = socketChannel.getLocalAddress().toString().substring(1);
               System.out.println(username + " is ok...");
           } catch (IOException e) {
               e.printStackTrace();
           }
       }

    
    public void send(String msg){
           msg = username + " 说:" + msg;
           try {
               socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
           } catch (IOException e) {
               e.printStackTrace();
           }
    }

    
    public void receiveMsg(){
        try {
            int read = selector.select();
            //有可用通道
            if (read!=0){
                Iterator iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if (key.isReadable()){
                        SocketChannel channel = (SocketChannel)key.channel();
                        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
                        int count = channel.read(byteBuffer);
                        //把读到的缓冲区的数据转成字符串
                        String msg = new String(byteBuffer.array(),0,count,StandardCharsets.UTF_8);
                        System.out.println(msg.trim());
                    }

                }
                删除当前的 selectionKey,防止重复操作
                iterator.remove();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        GroupChatClient chatClient=new GroupChatClient();
        new Thread(()->{
            //接受消息
            chatClient.receiveMsg();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        Scanner scanner=new Scanner(System.in);
        while (scanner.hasNextLine()){
            String s = scanner.nextLine();
            chatClient.send(s);
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336744.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号