栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

一文带你由浅入深Netty异步非阻塞世界

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

一文带你由浅入深Netty异步非阻塞世界

网络编程概念

两个程序通过一个双向的通信连接实现数据的交换,连接的一端称为一个socket

  • Socket是一个语言无标准,可以实现网络编程语言都有Socket
  • 通过·IP+Port通信
  • BIO、NIO、AIO适用场景

BIO:连接数少且固定的框架

NIO: 连接数多且连接时间短

AIO(NIO.2): 连接数多且连接时间长

Java IO流程图

Socket连接步骤
  1. 服务器监听
  2. 客户端请求
  3. 连接确定
  • Tips:连接的时候三次握手,断开连接四次挥手
同步和异步(OS底层操作)
  • 同步:使用同步IO时,Java自己处理IO读写
  • 异步:使用异步Io时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小给OS(用户数据),OS需要支持异步IO操作API
阻塞和非阻塞(程序阻塞代码块)
  • 阻塞:使用阻塞IO时,Java调用会一直阻塞到读写完成才返回。
  • 非阻塞:使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器通知可读写时再继续进行读写,不循环直到读写完成。
BIO编程
  • Blocking IO:同步阻塞的编程方式
  • BIO编程方式通常是在JDK1.4版本之前常用的编程方式。
  • 编程实现过程:首先服务端启动一个ServerSocket来监听网络请求,客户端启动Socket发起网络请求,默认情况下ServerSocket会建立一个线程来处理这个请求,如果服务端没有线程可用,客户端会阻塞等待或遭到拒绝。(可以加入线程池)

  • 改善

阻塞BIO案例

Server:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public final class ServerNormal {
	//默认的端口号
	private static int DEFAULT_PORT = 12345;
	//单例的ServerSocket
	private static ServerSocket server;
	//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
	public static void start() throws IOException{
		//使用默认值
		start(DEFAULT_PORT);
	}
	//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
	public synchronized static void start(int port) throws IOException{
		if(server != null) return;
		try{
			//通过构造函数创建ServerSocket
			//如果端口合法且空闲,服务端就监听成功
			server = new ServerSocket(port);
			System.out.println("服务器已启动,端口号:" + port);
			//通过无线循环监听客户端连接
			//如果没有客户端接入,将阻塞在accept操作上。
			while(true){
				Socket socket = server.accept();
				//当有新的客户端接入时,会执行下面的代码
				//然后创建一个新的线程处理这条Socket链路
				new Thread(new ServerHandler(socket)).start();
			}
		}finally{
			//一些必要的清理工作
			if(server != null){
				System.out.println("服务器已关闭。");
				server.close();
				server = null;
			}
		}
	}
}

ServerHandler

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
 
import com.anxpp.io.utils.Calculator;

public class ServerHandler implements Runnable{
	private Socket socket;
	public ServerHandler(Socket socket) {
		this.socket = socket;
	}
	@Override
	public void run() {
		BufferedReader in = null;
		PrintWriter out = null;
		try{
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(),true);
			String expression;
			String result;
			while(true){
				//通过BufferedReader读取一行
				//如果已经读到输入流尾部,返回null,退出循环
				//如果得到非空值,就尝试计算结果并返回
				if((expression = in.readLine())==null) break;
				System.out.println("服务器收到消息:" + expression);
				try{
					result = Calculator.cal(expression).toString();
				}catch(Exception e){
					result = "计算错误:" + e.getMessage();
				}
				out.println(result);
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			//一些必要的清理工作
			if(in != null){
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
				in = null;
			}
			if(out != null){
				out.close();
				out = null;
			}
			if(socket != null){
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
				socket = null;
			}
		}
	}
}

client

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Client {
	//默认的端口号
	private static int DEFAULT_SERVER_PORT = 12345;
	private static String DEFAULT_SERVER_IP = "127.0.0.1";
	public static void send(String expression){
		send(DEFAULT_SERVER_PORT,expression);
	}
	public static void send(int port,String expression){
		System.out.println("算术表达式为:" + expression);
		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		try{
			socket = new Socket(DEFAULT_SERVER_IP,port);
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(),true);
			out.println(expression);
			System.out.println("___结果为:" + in.readLine());
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			//一下必要的清理工作
			if(in != null){
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
				in = null;
			}
			if(out != null){
				out.close();
				out = null;
			}
			if(socket != null){
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
				socket = null;
			}
		}
	}
}

测试代码,放在一个程序中

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;

public class Test {
	//测试主方法
	public static void main(String[] args) throws InterruptedException {
		//运行服务器
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					ServerBetter.start();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}).start();
		//避免客户端先于服务器启动前执行代码
		Thread.sleep(100);
		//运行客户端 
		char operators[] = {'+','-','*','/'};
		Random random = new Random(System.currentTimeMillis());
		new Thread(new Runnable() {
			@SuppressWarnings("static-access")
			@Override
			public void run() {
				while(true){
					//随机产生算术表达式
					String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
					Client.send(expression);
					try {
						Thread.currentThread().sleep(random.nextInt(1000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}).start();
	}
}
NIO编程
  • Ubblocking(New IO):同步阻塞的编程方式
  • 主要解决BIO的大并发问题,NIO基于Reactor,面向Buffer(缓存区)编程
  • 不能完全解决BIO上的问题,当并发上来的话,还是会有BIO一样的问题

  • 同步非阻塞,服务器实现模式为一个请求一个通道,即客户端发送的连接请求都会注册到多路复用器器上,多路复用器轮询到有I/O操作请求时才启动一个线程进行处理
  • NIO方式使用于连接数目多且比较短(轻操作)的架构,比如:聊天服务器,并发局限于应用中,编程复杂,JDK1.8开始支持
  • NIO核心三大组件:Selector(多路复用器)、Channel(通道)、Buffer(缓冲区)
  • NIO核心三大组件之间的关系
1.每个 channel都会对应一个 Buffer
2.Selector对应一个线程,一个线程对应多个 channel(连接)
3.该图反应了有三个 channel?注册到该 selector/程序
4.程序切换到哪个 channel是有事件决定的, Event就是一个重要的概念
5.Selector会根据不同的事件,在各个通道上切换
6.Buffer就是一个内存块,底层是有一个数组
7.数据的读取写入是通过 Buffer,这个和BIO,BIO中要么是输入流,或者是输出流,不能双向,但是NIO的 Buffer是可以读也可以写,需要ip方法切换
8.channel是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系通道就是双向的
  • Buffer有7个子类(没有BooleanBuffer):ByteBuffer、IntBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、ShortBuffer
  1. 最常用的是ByteBuffer
  • Buffer重要的四个属性:
  1. mark:标志位
  2. position:下标指针
  3. limit:当前缓冲区的终点
  4. capacity:缓冲区容量
  • flip通过改变这四个属性的值达到反转Buffer状态的功能

Buffer常用方法:

yteBuffer常用方法:

缓冲区案例
  • 缓冲区代码实例:
package icu.lookyousmileface.nio.basic;

import java.nio.IntBuffer;


public class NioBuffer {
    public static void main(String[] args) {
        //IntBuffer.allocate(5);表示Buffer的空间为5,并且Buffer缓冲区类型为Int
        IntBuffer intBuffer = IntBuffer.allocate(5);
        //intBuffer.capacity()表示获得Buffer的大小
        for(int i = 0;i< intBuffer.capacity();i++){

            intBuffer.put(i*2);

        }
        //buffer进行过写操作之后需要读操作的时候需要flip进行状态反转
        intBuffer.flip();
        
        //hasRemaining()返回剩余的可用长度
        while (intBuffer.hasRemaining()){
            System.out.println(intBuffer.get());
        }
    }
}

Channel(通道):

FileChannel:

  • TransferTo拷贝的比较快底层实现是零拷贝
  • ByteBuffer+FileChannel实现文字输入到文件中:
public class NioBufferChannelFileWrite {

    private static final String msg = "怒发冲冠,凭栏处、潇潇雨歇。抬望眼、仰天长啸,壮怀激烈。三十功名尘与土,八千里路云和月。莫等闲、白了少年头,空悲切。";

    public static void main(String[] args) {

        try {

            FileOutputStream fileOutputStream = new FileOutputStream(new File("src/main/resources/filedata/data1.txt"));

            FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();

            ByteBuffer fileDataBuffer = ByteBuffer.allocate(1024);

            ByteBuffer putData = fileDataBuffer.put(msg.getBytes());

            //反转
            putData.flip();

            fileOutputStreamChannel.write(putData);

            fileOutputStream.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

实现的流程图

ByteBuffer+FileChannel实现从文件中读取到控制台:

public class NioBufferChannelRead {

    private static final String filePath = "src/main/resources/filedata/data1.txt";

    public static void main(String[] args) {

        File file = new File(filePath);

        try {

            FileInputStream fileInputStream = new FileInputStream(file);

            FileChannel fileInputStreamChannel = fileInputStream.getChannel();
            //获取file的大小,避免浪费内存
            ByteBuffer byteDataBuffer = ByteBuffer.allocate((int) file.length());

            fileInputStreamChannel.read(byteDataBuffer);
            //byteDataBuffer.array()将byteBuffer缓冲区中的data变成数组
            System.out.println(new String(byteDataBuffer.array()));

            fileInputStream.close();

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

实现的流程图

Buffer的分散和聚焦
  • Buffer的分散和聚焦:

Scattering:将数据写入到buffer时,可以采取buffer数组,依次写入【 分散】

Gathering:从buffer读取数据时,可以采用buffer数组,依次读取【聚焦】

    • 实例代码(Kotlin):
fun main(args: Array): Unit {

    val serverSocketChannel = ServerSocketChannel.open();
    val inetSocketAddress = InetSocketAddress(9948);
    //绑定端口到socket上,并启动
    serverSocketChannel.socket().bind(inetSocketAddress);
    
    //buffer数组,NIO会自动将数据放到数组中,无需操心
    val byteBuffer = arrayOfNulls(2)
    byteBuffer[0] = ByteBuffer.allocate(5)
    byteBuffer[1] = ByteBuffer.allocate(3)

    val scoketChannel = serverSocketChannel.accept();
    var messageLight = 8;

    while (true) {

        var byteRead: Int = 0

        while (byteRead < messageLight) {

            var read = scoketChannel.read(byteBuffer)

            println("byteRead:" + read)

            byteRead += read.toInt()

            Arrays.asList(*byteBuffer).stream().map { buffer: ByteBuffer -> "potion:" + buffer.position() + "limit:" + buffer.limit() }.forEach { x: String? -> println(x) }
        }

        Arrays.asList(*byteBuffer).forEach(Consumer { buffer: ByteBuffer -> buffer.flip() })

        var byteWrite: Long = 0;

        while (byteWrite < messageLight) {

            var write = scoketChannel.write(byteBuffer)
            byteWrite += write
        }

        Arrays.asList(*byteBuffer).forEach(Consumer { buffer: ByteBuffer -> buffer.clear() })

        println("byteRead:" + byteRead + "byteWrite:" + byteWrite + "messagelenght:" + messageLight)

    }
}

一个Buffer实现文件的复制:

public class NioBufferOnlyOneWriteAndRead {

    private static final String filePath1 = "src/main/resources/filedata/data1.txt";

    private static final String filePath2 = "src/main/resources/filedata/data2.txt";

    public static void main(String[] args) {

        File file1 = new File(filePath1);
        File file2 = new File(filePath2);

        try {

            FileInputStream fileInputStream = new FileInputStream(file1);
            FileOutputStream fileOutputStream = new FileOutputStream(file2);

            FileChannel fileInputStreamChannel = fileInputStream.getChannel();
            FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();

            ByteBuffer dataBuffer = ByteBuffer.allocate(512);

            while (-1 != fileInputStreamChannel.read(dataBuffer)) {
                //读和写之间需要切换
                dataBuffer.flip();

                fileOutputStreamChannel.write(dataBuffer);
                //清空Buffer缓冲区的数据
                dataBuffer.clear();

            }
            fileInputStream.close();
            fileOutputStream.close();

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

利用transferFrom拷贝文件:

public class NioBufferTransorf {

    private static final String filepath1 = "src/main/resources/filedata/data1.txt";
    private static final String filepath2 = "src/main/resources/filedata/data2.txt";

    public static void main(String[] args) {

        try {

            FileInputStream fileInputStream = new FileInputStream(new File(filepath1));
            FileOutputStream fileOutputStream = new FileOutputStream(new File(filepath2));

            FileChannel fileInputStreamChannel = fileInputStream.getChannel();
            FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();

            //将目标的通道的数据复制到当前通道,Channel自带数据有效长度size获取
            fileOutputStreamChannel.transferFrom(fileInputStreamChannel, 0, fileInputStreamChannel.size());

            fileInputStreamChannel.close();
            fileOutputStreamChannel.close();
            fileInputStream.close();
            fileOutputStream.close();

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

利用transferFrom拷贝Image(Kotlin):

fun main(args: Array):Unit {

    val  imagePath = "src/main/resources/filedata/sky.jpg";
    val  copyImageToPath = "src/main/resources/filedata/sky_copy.jpg"

    val accessFile = RandomAccessFile(imagePath, "rw")
    val accessFile_copy = RandomAccessFile(copyImageToPath, "rw")

    val accessFile_channle = accessFile.channel
    val accesssFile_copy_channel = accessFile_copy.channel

    accesssFile_copy_channel.transferFrom(accessFile_channle,0,accessFile_channle.size())
}

Buffer和Channel的注意事项:

NIO案例
  • 实例代码:
    • NioServer.java
package bigdata.studynio;
 
public class NioServer {
	
	public static void main(String[] args) {
		int port = 8080;
		if(args != null && args.length < 0){
			
			//port = Integer.valueOf(args[0]);	
		}
		//创建服务器线程
		NioServerWork nioServerWork = new NioServerWork(port);
		new Thread(nioServerWork, "server").start();
	}
}

NioServerWork.java

package bigdata.studynio;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
public class NioServerWork implements Runnable {
 
	//多路复用器 Selector会对注册在其上面的channel进行;轮询,当某个channel发生读写操作时,
	//就会处于相应的就绪状态,通过SelectionKey的值急性IO 操作
	private Selector selector;//多路复用器
	private ServerSocketChannel channel;
	private volatile boolean stop;
	
	public NioServerWork(int port) {
		try {
			selector = Selector.open();//打开多路复用器
			channel = ServerSocketChannel.open();//打开socketchannel
			channel.configureBlocking(false);//配置通道为非阻塞的状态
			channel.socket().bind(new InetSocketAddress(port), 1024);//通道socket绑定地址和端口
			channel.register(selector, SelectionKey.OP_ACCEPT);//将通道channel在多路复用器selector上注册为接收操作
			System.out.println("NIO 服务启动 端口: "+ port);
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}		
	}
	public void stop(){
		this.stop=true;
	}
 
	@Override
	public void run() {//线程的Runnable程序
		System.out.println("NIO 服务  run()");
		while(!stop){
			try {
				selector.select(1000);//最大阻塞时间1s
				//获取多路复用器的事件值SelectionKey,并存放在迭代器中
				Set selectedKeys = selector.selectedKeys();
				Iterator iterator = selectedKeys.iterator();
				SelectionKey key =null;
				//System.out.println("NIO 服务  try");
				while(iterator.hasNext()){
					System.out.println("NIO 服务  iterator.hasNext()");
					key = iterator.next();
					iterator.remove();//获取后冲迭代器中删除此值
					try {
						handleinput(key);//根据SelectionKey的值进行相应的读写操作				
					} catch (Exception e) {
						if(key!=null){
							key.cancel();
							if(key.channel()!=null)
								key.channel().close();							
						}
					}									
				}							
			} catch (IOException e) {
				System.out.println("NIO 服务  run  catch IOException");
				e.printStackTrace();
				System.exit(1);
			}
		}		
	}
 
	
	private void handleinput(SelectionKey key) throws IOException {
		System.out.println("NIO 服务  handleinput");
		if(key.isValid()){//判断所传的SelectionKey值是否可用
			if(key.isAcceptable()){//在构造函数中注册的key值为OP_ACCEPT,,在判断是否为接收操作
				ServerSocketChannel  ssc = (ServerSocketChannel)key.channel();//获取key值所对应的channel
				SocketChannel sc = ssc.accept();//设置为接收非阻塞通道
				sc.configureBlocking(false);
				sc.register(selector, SelectionKey.OP_READ);//并把这个通道注册为OP_READ			
			}
			if(key.isReadable()){//判断所传的SelectionKey值是否为OP_READ,通过上面的注册后,经过轮询后就会是此操作
				SocketChannel sc = (SocketChannel)key.channel();//获取key对应的channel
				ByteBuffer readbuf = ByteBuffer.allocate(1024);
				int readbytes = sc.read(readbuf);//从channel中读取byte数据并存放readbuf
				if(readbytes > 0){
					readbuf.flip();//检测时候为完整的内容,若不是则返回完整的
					byte[] bytes = new byte[readbuf.remaining()];
					readbuf.get(bytes);
					String string = new String(bytes, "UTF-8");//把读取的数据转换成string
					System.out.println("服务器接受到命令 :"+ string); 
					//"查询时间"就是读取的命令,此字符串要与客户端发送的一致,才能获取当前时间,否则就是bad order
					String currenttime = "查询时间".equalsIgnoreCase(string) ? new java.util.Date(System.currentTimeMillis()).toString() : "bad order";
					dowrite(sc,currenttime);//获取到当前时间后,就需要把当前时间的字符串发送出去
				}else if (readbytes < 0){
					key.cancel();
					sc.close();					
				}else{}				
			}			
		}		
	}
	
	private void dowrite(SocketChannel sc, String currenttime) throws IOException {
		System.out.println("服务器 dowrite  currenttime"+  currenttime);
	if(currenttime !=null && currenttime.trim().length()>0){
		byte[] bytes = currenttime.getBytes();//将当前时间序列化
		ByteBuffer writebuf = ByteBuffer.allocate(bytes.length);
		writebuf.put(bytes);//将序列化的内容写入分配的内存
		writebuf.flip();
		sc.write(writebuf);	//将此内容写入通道		
	}
 }
}

NioClient.java

package bigdata.studynio;
 
public class NioClient {
	
	public static void main(String[] args) {
		
		int port = 8080;
		if(args !=null && args.length > 0){
			try {
				//port = Integer.valueOf(args[0]);
			} catch (Exception e) {
				// TODO: handle exception
			}
		}
		//创建客户端线程
		new Thread(new NioClientWork("127.0.0.1",port),"client").start();
		
	}
 
}

NioClientWork.java

package bigdata.studynio;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
public class NioClientWork implements Runnable {
 
	private String host;
	private int port;
	private Selector selector;
	private SocketChannel socketChannel;
	private volatile boolean stop;
	
	
	public NioClientWork(String string, int port) {
		this.host = string == null ? "127.0.0.1":string;
		this.port = port;
		try {
			selector= Selector.open();//打开多路复用器
			socketChannel=SocketChannel.open();//打开socketchannel
			socketChannel.configureBlocking(false);
			System.out.println("NIO 客户端启动 端口: "+ port);
		} catch (IOException e) {
			e.printStackTrace();
			System.exit(1);
		}
		
	}
 
	
	@Override
	public void run() {
		try {
			doConnect();//客户端线程需要连接服务器
		} catch (Exception e) {
			e.printStackTrace();
			System.exit(1);
		}
		while(!stop){
			
			try {
				selector.select(1000);//最大阻塞时间1s
				//获取多路复用器的事件值SelectionKey,并存放在迭代器中
				Set selectedKeys = selector.selectedKeys();
				Iterator iterator = selectedKeys.iterator();
				SelectionKey key =null;
				while (iterator.hasNext()) {
					key = iterator.next();
					iterator.remove();
					try {
						handleinput(key);//获取多路复用器的事件值SelectionKey,并存放在迭代器中					
					} catch (Exception e) {
						if(key == null){
							key.cancel();
							if(socketChannel ==null)
								socketChannel.close();							
						}
					}					
				}
				
			} catch (IOException e) {
				e.printStackTrace();
				System.exit(1);
			}			
		}
		if(selector !=null){
			try {
				selector.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
	private void doConnect() throws IOException {
		if(socketChannel.connect(new InetSocketAddress(host, port))){//检测通道是否连接到服务器 
			System.out.println("NIO 客户端 idoConnect OP_READ ");
			socketChannel.register(selector, SelectionKey.OP_READ);//如果已经连接到了服务器,就把通道在selector注册为OP_READ
			dowrite(socketChannel);
		}else{
			System.out.println("NIO 客户端 doConnect OP_CONNECT ");
			socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果客户端未连接到服务器,则将通道注册为OP_CONNECT操作
		}	
	}
 
	
	private void handleinput(SelectionKey key) throws IOException {
		//System.out.println("NIO 客户端 handleinput ");
		if(key.isValid()){//判断所传的SelectionKey值是否可用
			//System.out.println("NIO 客户端 isValid() ");
			SocketChannel sc = (SocketChannel) key.channel();
			if(key.isConnectable()){//一开始的时候,客户端需要连接服务器操作,所以检测是否为连接状态
				System.out.println("NIO 客户端 isConnectable ");
				if(sc.finishConnect()){//是否完成连接
					System.out.println("NIO 客户端 finishConnect ");
					dowrite(sc);//向通道内发送数据,就是“查询时间” 的命令,读写通道与通道注册事件类型无关,注册事件只是当有事件来了,就会去处理相应事件
					sc.register(selector, SelectionKey.OP_READ);//如果完成了连接,就把通道注册为 OP_READ操作,用于接收服务器出过来的数据
				}else{
					System.out.println("NIO 客户端 not finishConnect ");
					System.exit(1);				
				}
			}
			if(key.isReadable()){//根据上面注册的selector的通道读事件,进行操作
				System.out.println("NIO 客户端 isReadable() ");
				ByteBuffer readbuf = ByteBuffer.allocate(1024);
				int readbytes = sc.read(readbuf);//获取通道从服务器发过来的数据,并反序列化
				if(readbytes > 0){
					readbuf.flip();
					byte[] bytes=new byte[readbuf.remaining()];
					readbuf.get(bytes);
					String string = new String(bytes, "UTF-8");
					System.out.println("时间是: " + string);
					this.stop=true;	//操作完毕后,关闭所有的操作				
				}else if (readbytes < 0){
					key.cancel();
					sc.close();
					
				}else{}			
			 }				
		}		
	}
	private void dowrite(SocketChannel sc) throws IOException {
		byte[] req = "查询时间".getBytes();
		ByteBuffer writebuf = ByteBuffer.allocate(req.length);
		writebuf.put(req);
		writebuf.flip();
		sc.write(writebuf);
		if(!writebuf.hasRemaining()){
			System.out.println("向服务器发送命令成功 ");
		}	
	}
}

asReadOnlyBuffer例子:

public class NioBufferOnlyRead {

    public static void main(String[] args) {

        ByteBuffer dataBuffer = ByteBuffer.allocate(5);

        for (int i = 0; i < dataBuffer.capacity() - 1; i++) {
            dataBuffer.put((byte) (i * 2));
        }

        dataBuffer.flip();
        //可以从一个创建的Buffer获取OnlyReadBuffer
        ByteBuffer onlyByteBuffer = dataBuffer.asReadOnlyBuffer();

        while (onlyByteBuffer.hasRemaining()) {
            System.out.println(onlyByteBuffer.get());
        }
        //无法往OnlyReadBuffer写数据
//        onlyByteBuffer.put((byte)(2));
        //可以往dataBuffer中写数据
        dataBuffer.put((byte) (2));
    }
}
  • 指定Buffer的读写操作,会生成一个新的Buffer
  • MappedByteBuffer:可以实现文件在内存(堆外内存)直接修改,而操作系统无需再拷贝一份,实例代码:
public class NioBufferAsReadOnlyBuffer {

    public static void main(String[] args) {

        try {

            RandomAccessFile randomAccessFile = new RandomAccessFile("src/main/resources/filedata/data1.txt", "rw");

            FileChannel randomAccessFileChannel = randomAccessFile.getChannel();

            MappedByteBuffer map = randomAccessFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, randomAccessFileChannel.size());

            map.put(0, (byte) ('H'));

            randomAccessFile.close();
            randomAccessFileChannel.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

RandomAccessFile处理文本,mode的源码:

String name = (file != null ? file.getPath() : null);
        int imode = -1;
        if (mode.equals("r"))
            imode = O_RDONLY;
        else if (mode.startsWith("rw")) {
            imode = O_RDWR;
            rw = true;
            if (mode.length() > 2) {
                if (mode.equals("rws"))
                    imode |= O_SYNC;
                else if (mode.equals("rwd"))
                    imode |= O_DSYNC;
                else
                    imode = -1;
            }

由源码可知mode的四种模式对应的功能如下:

MappedByteBuffer属性解析:

public abstract MappedByteBuffer map(MapMode mode,
                                         long position, long size)
        throws IOException;

mode的三种模式:

        public static final MapMode READ_ONLY
            = new MapMode("READ_ONLY");

        
        public static final MapMode READ_WRITE
            = new MapMode("READ_WRITE");

        
        public static final MapMode PRIVATE
            = new MapMode("PRIVATE");
多路复用
  • Selector(多路复用器)
  1. Java的Nio,用到非阻塞的IO方式。可以用一个线程,处理多个客户端连接,就会使用到Selector(选择器)
  2. Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式注册到同一个Selector),如果有事件发生,便获取时间让后针对每个事件进行相应的处理,这样就可以只用一个单线程区管理多个通道,也就是管理多个连接和请求
  • Selector相关方法
//实现Closeable接口表示拥有自动关闭流的功能
public abstract class Selector implements Closeable {
//表示获得一个Selector实例
public static Selector open() throws IOException {
//表示设置超时时间,非阻塞,当有事件发生的时候将将注册到相应的SelectionLey中
public abstract int select(long timeout)
//表示获得已经注册了所有的Selectkey
public abstract Set selectedKeys();
//阻塞的方法
public abstract int select() throws IOException;
//在未超过select(long timeout)的范围中,可以wakeup唤醒selector
public abstract Selector wakeup();
//不阻塞,立即返回
public abstract int selectNow() throws IOException;
  • 获得了SelectionKey相当于获得了对应的Channel,SelectionKey和Channel之间是注册关系
  • Selectort、SelectionKey、ServerSocketChannel、SocketChannel

对上图的说明:

1.当客户端连接时,会通过ServerSocketChannel得到SocketChannel
2.将socketChannel注册到Selector上,register(Selector sel, int ops)一个selector上可以注册多个SocketChannel
3.注册后返回一个SelectionKey,会和该Selector关联(集合)
4.Selector进行监听select方法,返回有事件发生的通道的个数
5.进一步得到各个SelectionKey(有事件发生)
6.在通过SelectionKey反向获取SocketChannel,方法channel()
7.可以通过得到channel,完成业务处理

使用三大核心组件编写Server-Client实现上图业务逻辑:

  • Server(Java):
public class NioServer {

    public static void main(String[] args) {

        try {

            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

            Selector selector = Selector.open();

            serverSocketChannel.socket().bind(new InetSocketAddress(7798));
            //设置未非阻塞
            serverSocketChannel.configureBlocking(false);
            //注册selector,设置关注事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {

                if (selector.select(1000) == 0) {
                    System.out.println("超时1s!");
                    continue;
                }
                //selector>0表示触发了关注事件
                Set selectionKeys = selector.selectedKeys();
                Iterator iterator = selectionKeys.iterator();

                iterator.forEachRemaining(s -> {
                    //监听连接事件
                    if (s.isAcceptable()) {
                        try {
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            //非阻塞
                            socketChannel.configureBlocking(false);
                            System.out.println("一个客户端连接成功"+socketChannel.hashCode());
                            socketChannel.register(selector, OP_READ, ByteBuffer.allocate(1024));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    //监听读事件
                    if (s.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel)s.channel();
                        ByteBuffer dataBuffer = (ByteBuffer)s.attachment();
                        try {
                            socketChannel.read(dataBuffer);
                            System.out.println("来自客户端:"+new String(dataBuffer.array()));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        dataBuffer.clear();
                    }
                    iterator.remove();
                });
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Client(Kotlin):

fun main(args: Array):Unit {

    val data = "我深深地熟悉你脚步的韵律,它在我心中敲击."
    val socketChannel = SocketChannel.open();

    socketChannel.configureBlocking(false);

    val inetSocketAddress = InetSocketAddress("127.0.0.1",7798)

    if(!socketChannel.connect(inetSocketAddress)){
        while (!socketChannel.finishConnect()){
            
            System.out.println("连接服务器,需要时间....出去溜达会吧~")
        }
    }

    val dataBuffer = ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8))
    socketChannel.write(dataBuffer)
    System.`in`.read()
}

SelectionKey API

//注册channel对应的SelectionKey
public abstract class SelectionKey {
//得到与之与之关联的selector
public abstract Selector selector();
//得到与之关联的channel
public abstract SelectableChannel channel();
//是否可以读
public final boolean isReadable() 
//是否可以写
public final boolean isWritable() 
//是否可以accept
public final boolean isAcceptable()
//获得与之关联的数据 
public final Object attachment()
//改变监听事件
public abstract SelectionKey interestOps(int ops);

SelectionKey的四个重要属性,<<表示位运算向左移动

public static final int OP_READ = 1 << 0;//1 读操作
public static final int OP_WRITE = 1 << 2;//4 写操作
public static final int OP_CONNECT = 1 << 3;//8 已经连接
public static final int OP_ACCEPT = 1 << 4;//16 有新的网络可以accept

SocketChannel API

  1. SocketChannel,网络IO通道,具体负责进行读写操作,NIO把缓冲区的数据写入通道,或者把通道的数据读到缓冲区
public abstract class SocketChannel
    extends AbstractSelectableChannel
    implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
{
//获得一个SocketChannel通道
public static SocketChannel open() throws IOException {
//设置阻塞或非阻塞
public abstract boolean isConnectionPending();
//连接服务器
public abstract boolean connect(SocketAddress remote) throws IOException;
//如果connect方法连接失败,接下来通过该方法进行完成连接操作
public abstract boolean finishConnect() throws IOException;
//往通道内写数据
public abstract int write(ByteBuffer src) throws IOException;
//往通道里读数据
public abstract int read(ByteBuffer dst) throws IOException;
//注册一个选择器斌设置监听事件。最后一个参数可以设置共享数据
public final SelectionKey register(Selector sel, int ops,
                                       Object att)
多人聊天室(NIO)
  • 多人聊天室
    • Server
public class GroupToChatWithServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private static final int Port = 7799;
    
    public GroupToChatWithServer() {

        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();

            serverSocketChannel.socket().bind(new InetSocketAddress(Port));

            serverSocketChannel.configureBlocking(false);

            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public void listen() {

        try {

            while (true) {

                int status = selector.select();
                if (status > 0) {

                    Set selectionKeys = selector.selectedKeys();
                    Iterator key_Iterator = selectionKeys.iterator();
                    key_Iterator.forEachRemaining(s -> {
                        if (s.isAcceptable()) {
                            try {
                                SocketChannel socketChannel = serverSocketChannel.accept();
                                socketChannel.configureBlocking(false);
                                socketChannel.register(selector, SelectionKey.OP_READ);
                                System.out.println(socketChannel.getRemoteAddress()+"上线了");
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if(s.isReadable()){
                            readData(s);
                        }
                        key_Iterator.remove();
                    });

                } else {
//                    System.out.println("等待中....");
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    }
    
    private void readData(SelectionKey key){

        SocketChannel socketChannel = null;
        try{
            socketChannel = (SocketChannel) key.channel();
            ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
            int readCount = socketChannel.read(dataBuffer);

            if (readCount>0){
                String msg = new String(dataBuffer.array());
                System.out.println("from 客户端"+msg);
                sendInfo(msg,socketChannel);
            }

        }catch (Exception e){
            try {
                System.out.println(socketChannel.getRemoteAddress()+"下线了");
                key.cancel();
                socketChannel.close();
            } catch (IOException e2) {
                e2.printStackTrace();
            }
        }
    }
    
    private void sendInfo(String msg, SocketChannel self){
        System.out.println("服务器发送信息中...");
        selector.selectedKeys().stream().forEach(s->{
            Channel targetChannel = s.channel();
            //排除自己发送
            if (targetChannel instanceof SocketChannel && targetChannel != self){
                SocketChannel dest = (SocketChannel)targetChannel;
                ByteBuffer dataBuffer = ByteBuffer.wrap(msg.getBytes());
                try {
                    dest.write(dataBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });

    }
    
    public static void main(String[] args) {
        GroupToChatWithServer chatWithServer = new GroupToChatWithServer();
        chatWithServer.listen();
    }
}

Client

public class GroupClient {

    private final String host = "127.0.0.1";
    private final int port = 7799;
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;
    
    public GroupClient() {

        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open(new InetSocketAddress(host,port));
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            username = socketChannel.getLocalAddress().toString().substring(1);
            System.out.println(username+"is ok!!!");

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public  void sendInfo(String info){
        info = username+"说:"+info;
        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public void readInfo(){
        try {
            int status = selector.select();
            if (status>0){
                Iterator key_iterator = selector.selectedKeys().iterator();
                while (key_iterator.hasNext()) {
                    SelectionKey s = key_iterator.next();
                    if (s.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) s.channel();
                        ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
                        try {
                            socketChannel.read(dataBuffer);
                            String msg = new String(dataBuffer.array());
                            System.out.println(msg.trim());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
                key_iterator.remove();
            }else {
//                System.out.println("没有可用的通道...");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        GroupClient chatClient = new GroupClient();
        new Thread(){
            @Override
            public void run() {
                while (true){
                    chatClient.readInfo();
                    try {
                        sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
        //发送给服务端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String str = scanner.nextLine();
            chatClient.sendInfo(str);
        }
    }
}
零拷贝
  • 零拷贝:是指没有CPU拷贝
    • 实在网络传输优化的重要手段,从OS角度来看,内核缓冲区只有唯一的一份数据,不存在重复,更少使用CPU缓存伪共享以及无CPU校验和计算

常用两种零拷贝mmap(内存映射)和sendFile

    • 传统io

  • 三次上下文切换
  • Hard driver→kernel buffer→user buffer→socket buffer→protocol engine
  • kernel buffer、user buffer、socket buffer(两次cpu拷贝)

mmao优化

  • 两次上下文切换
  • Hard driver→kernel buffer→socket buffer→protocol engine
  • mmap直接将Hard driver文件映射到kernel buffer上,kernel buffer和user buffer共享数据,通过直接操作kernel buffer(内核内存)数据,实现文件的操作
  • kernel buffer、socket buffer(一次cpu拷贝)

sendFile

Linux2.1

  • 两次上文切换
  • Hard driver→kernel buffer→socket buffer→protocol engine
  • kernel buffer 、socket buffer(一次cpu拷贝(拷贝的是元数据,比如:数据长度等))
  • CPU copy:cpu拷贝,DMA copy:内存拷贝
  • Linux2.4

  • Hard driver →kernel buffer—→(socket buffer(copy kernel buffer元数据,比如lenght等,可以忽略))→protocol engine
  • kernel buffer 可以直接通过内存拷贝到协议栈protocol engin,但是还是有少量数据需要cpu copy到socket buffer上

使用.transferTo零拷贝传输文件

Server

public class NioServer implements Serializable {

    private static final String host = "127.0.0.1";
    private static final int port = 8899;

    public static void main(String[] args) throws IOException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(host, port));
        ByteBuffer dataBuffer = ByteBuffer.allocate(4098);

        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            int readCount = 0;
            while (-1 != readCount) {
                try {
                    //统计读取字节
                    readCount = socketChannel.read(dataBuffer);
                }catch (IOException e){
                    break;
                }
                //可以让dataBuffer复用,使position = 0;mark = -1作废
                dataBuffer.rewind();
            }
        }
    }
}
  • .rewind:表示buffer的倒带,也就是position=0,mark标志位无效

Client:

public class NioClient implements Serializable {

    private static final  String fileName = "protoc-3.6.1-win32.zip";
    private static final String  host = "127.0.0.1";
    private static final int port = 8899;

    public static void main(String[] args) throws IOException {

        SocketChannel socketChannel = SocketChannel.open();
				//客户端使用connect连接
        socketChannel.connect(new InetSocketAddress(host,port));
        //文件传输通道
        FileChannel fileChannel = new FileInputStream(new File(fileName)).getChannel();

        long start = System.currentTimeMillis();

        long byteNum = fileChannel.transferTo(0, fileChannel.size(), socketChannel);

        System.out.println("传输的字节数:"+byteNum+"耗时:"+(System.currentTimeMillis()-start));

        fileChannel.close();

    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/848139.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号