两个程序通过一个双向的通信连接实现数据的交换,连接的一端称为一个socket
- Socket是一个语言无标准,可以实现网络编程语言都有Socket
- 通过·IP+Port通信
- BIO、NIO、AIO适用场景
Java IO流程图 Socket连接步骤BIO:连接数少且固定的框架
NIO: 连接数多且连接时间短
AIO(NIO.2): 连接数多且连接时间长
- 服务器监听
- 客户端请求
- 连接确定
- Tips:连接的时候三次握手,断开连接四次挥手
- 同步:使用同步IO时,Java自己处理IO读写
- 异步:使用异步Io时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小给OS(用户数据),OS需要支持异步IO操作API
- 阻塞:使用阻塞IO时,Java调用会一直阻塞到读写完成才返回。
- 非阻塞:使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器通知可读写时再继续进行读写,不循环直到读写完成。
- Blocking IO:同步阻塞的编程方式
- BIO编程方式通常是在JDK1.4版本之前常用的编程方式。
- 编程实现过程:首先服务端启动一个ServerSocket来监听网络请求,客户端启动Socket发起网络请求,默认情况下ServerSocket会建立一个线程来处理这个请求,如果服务端没有线程可用,客户端会阻塞等待或遭到拒绝。(可以加入线程池)
- 改善
Server:
package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public final class ServerNormal {
//默认的端口号
private static int DEFAULT_PORT = 12345;
//单例的ServerSocket
private static ServerSocket server;
//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
public static void start() throws IOException{
//使用默认值
start(DEFAULT_PORT);
}
//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
public synchronized static void start(int port) throws IOException{
if(server != null) return;
try{
//通过构造函数创建ServerSocket
//如果端口合法且空闲,服务端就监听成功
server = new ServerSocket(port);
System.out.println("服务器已启动,端口号:" + port);
//通过无线循环监听客户端连接
//如果没有客户端接入,将阻塞在accept操作上。
while(true){
Socket socket = server.accept();
//当有新的客户端接入时,会执行下面的代码
//然后创建一个新的线程处理这条Socket链路
new Thread(new ServerHandler(socket)).start();
}
}finally{
//一些必要的清理工作
if(server != null){
System.out.println("服务器已关闭。");
server.close();
server = null;
}
}
}
}
ServerHandler
package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import com.anxpp.io.utils.Calculator;
public class ServerHandler implements Runnable{
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try{
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
String expression;
String result;
while(true){
//通过BufferedReader读取一行
//如果已经读到输入流尾部,返回null,退出循环
//如果得到非空值,就尝试计算结果并返回
if((expression = in.readLine())==null) break;
System.out.println("服务器收到消息:" + expression);
try{
result = Calculator.cal(expression).toString();
}catch(Exception e){
result = "计算错误:" + e.getMessage();
}
out.println(result);
}
}catch(Exception e){
e.printStackTrace();
}finally{
//一些必要的清理工作
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if(out != null){
out.close();
out = null;
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
client
package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class Client {
//默认的端口号
private static int DEFAULT_SERVER_PORT = 12345;
private static String DEFAULT_SERVER_IP = "127.0.0.1";
public static void send(String expression){
send(DEFAULT_SERVER_PORT,expression);
}
public static void send(int port,String expression){
System.out.println("算术表达式为:" + expression);
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try{
socket = new Socket(DEFAULT_SERVER_IP,port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
out.println(expression);
System.out.println("___结果为:" + in.readLine());
}catch(Exception e){
e.printStackTrace();
}finally{
//一下必要的清理工作
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if(out != null){
out.close();
out = null;
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
测试代码,放在一个程序中
package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;
public class Test {
//测试主方法
public static void main(String[] args) throws InterruptedException {
//运行服务器
new Thread(new Runnable() {
@Override
public void run() {
try {
ServerBetter.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
//避免客户端先于服务器启动前执行代码
Thread.sleep(100);
//运行客户端
char operators[] = {'+','-','*','/'};
Random random = new Random(System.currentTimeMillis());
new Thread(new Runnable() {
@SuppressWarnings("static-access")
@Override
public void run() {
while(true){
//随机产生算术表达式
String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
Client.send(expression);
try {
Thread.currentThread().sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
NIO编程
- Ubblocking(New IO):同步阻塞的编程方式
- 主要解决BIO的大并发问题,NIO基于Reactor,面向Buffer(缓存区)编程
- 不能完全解决BIO上的问题,当并发上来的话,还是会有BIO一样的问题
- 同步非阻塞,服务器实现模式为一个请求一个通道,即客户端发送的连接请求都会注册到多路复用器器上,多路复用器轮询到有I/O操作请求时才启动一个线程进行处理
- NIO方式使用于连接数目多且比较短(轻操作)的架构,比如:聊天服务器,并发局限于应用中,编程复杂,JDK1.8开始支持
- NIO核心三大组件:Selector(多路复用器)、Channel(通道)、Buffer(缓冲区)
- NIO核心三大组件之间的关系
1.每个 channel都会对应一个 Buffer 2.Selector对应一个线程,一个线程对应多个 channel(连接) 3.该图反应了有三个 channel?注册到该 selector/程序 4.程序切换到哪个 channel是有事件决定的, Event就是一个重要的概念 5.Selector会根据不同的事件,在各个通道上切换 6.Buffer就是一个内存块,底层是有一个数组 7.数据的读取写入是通过 Buffer,这个和BIO,BIO中要么是输入流,或者是输出流,不能双向,但是NIO的 Buffer是可以读也可以写,需要ip方法切换 8.channel是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系通道就是双向的
- Buffer有7个子类(没有BooleanBuffer):ByteBuffer、IntBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、ShortBuffer
- 最常用的是ByteBuffer
- Buffer重要的四个属性:
- mark:标志位
- position:下标指针
- limit:当前缓冲区的终点
- capacity:缓冲区容量
- flip通过改变这四个属性的值达到反转Buffer状态的功能
Buffer常用方法:
yteBuffer常用方法:
缓冲区案例- 缓冲区代码实例:
package icu.lookyousmileface.nio.basic;
import java.nio.IntBuffer;
public class NioBuffer {
public static void main(String[] args) {
//IntBuffer.allocate(5);表示Buffer的空间为5,并且Buffer缓冲区类型为Int
IntBuffer intBuffer = IntBuffer.allocate(5);
//intBuffer.capacity()表示获得Buffer的大小
for(int i = 0;i< intBuffer.capacity();i++){
intBuffer.put(i*2);
}
//buffer进行过写操作之后需要读操作的时候需要flip进行状态反转
intBuffer.flip();
//hasRemaining()返回剩余的可用长度
while (intBuffer.hasRemaining()){
System.out.println(intBuffer.get());
}
}
}
Channel(通道):
FileChannel:
- TransferTo拷贝的比较快底层实现是零拷贝
- ByteBuffer+FileChannel实现文字输入到文件中:
public class NioBufferChannelFileWrite {
private static final String msg = "怒发冲冠,凭栏处、潇潇雨歇。抬望眼、仰天长啸,壮怀激烈。三十功名尘与土,八千里路云和月。莫等闲、白了少年头,空悲切。";
public static void main(String[] args) {
try {
FileOutputStream fileOutputStream = new FileOutputStream(new File("src/main/resources/filedata/data1.txt"));
FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
ByteBuffer fileDataBuffer = ByteBuffer.allocate(1024);
ByteBuffer putData = fileDataBuffer.put(msg.getBytes());
//反转
putData.flip();
fileOutputStreamChannel.write(putData);
fileOutputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
实现的流程图
ByteBuffer+FileChannel实现从文件中读取到控制台:
public class NioBufferChannelRead {
private static final String filePath = "src/main/resources/filedata/data1.txt";
public static void main(String[] args) {
File file = new File(filePath);
try {
FileInputStream fileInputStream = new FileInputStream(file);
FileChannel fileInputStreamChannel = fileInputStream.getChannel();
//获取file的大小,避免浪费内存
ByteBuffer byteDataBuffer = ByteBuffer.allocate((int) file.length());
fileInputStreamChannel.read(byteDataBuffer);
//byteDataBuffer.array()将byteBuffer缓冲区中的data变成数组
System.out.println(new String(byteDataBuffer.array()));
fileInputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
实现的流程图
Buffer的分散和聚焦- Buffer的分散和聚焦:
Scattering:将数据写入到buffer时,可以采取buffer数组,依次写入【 分散】
Gathering:从buffer读取数据时,可以采用buffer数组,依次读取【聚焦】
-
- 实例代码(Kotlin):
fun main(args: Array): Unit { val serverSocketChannel = ServerSocketChannel.open(); val inetSocketAddress = InetSocketAddress(9948); //绑定端口到socket上,并启动 serverSocketChannel.socket().bind(inetSocketAddress); //buffer数组,NIO会自动将数据放到数组中,无需操心 val byteBuffer = arrayOfNulls (2) byteBuffer[0] = ByteBuffer.allocate(5) byteBuffer[1] = ByteBuffer.allocate(3) val scoketChannel = serverSocketChannel.accept(); var messageLight = 8; while (true) { var byteRead: Int = 0 while (byteRead < messageLight) { var read = scoketChannel.read(byteBuffer) println("byteRead:" + read) byteRead += read.toInt() Arrays.asList (*byteBuffer).stream().map { buffer: ByteBuffer -> "potion:" + buffer.position() + "limit:" + buffer.limit() }.forEach { x: String? -> println(x) } } Arrays.asList (*byteBuffer).forEach(Consumer { buffer: ByteBuffer -> buffer.flip() }) var byteWrite: Long = 0; while (byteWrite < messageLight) { var write = scoketChannel.write(byteBuffer) byteWrite += write } Arrays.asList (*byteBuffer).forEach(Consumer { buffer: ByteBuffer -> buffer.clear() }) println("byteRead:" + byteRead + "byteWrite:" + byteWrite + "messagelenght:" + messageLight) } }
一个Buffer实现文件的复制:
public class NioBufferOnlyOneWriteAndRead {
private static final String filePath1 = "src/main/resources/filedata/data1.txt";
private static final String filePath2 = "src/main/resources/filedata/data2.txt";
public static void main(String[] args) {
File file1 = new File(filePath1);
File file2 = new File(filePath2);
try {
FileInputStream fileInputStream = new FileInputStream(file1);
FileOutputStream fileOutputStream = new FileOutputStream(file2);
FileChannel fileInputStreamChannel = fileInputStream.getChannel();
FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
ByteBuffer dataBuffer = ByteBuffer.allocate(512);
while (-1 != fileInputStreamChannel.read(dataBuffer)) {
//读和写之间需要切换
dataBuffer.flip();
fileOutputStreamChannel.write(dataBuffer);
//清空Buffer缓冲区的数据
dataBuffer.clear();
}
fileInputStream.close();
fileOutputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
利用transferFrom拷贝文件:
public class NioBufferTransorf {
private static final String filepath1 = "src/main/resources/filedata/data1.txt";
private static final String filepath2 = "src/main/resources/filedata/data2.txt";
public static void main(String[] args) {
try {
FileInputStream fileInputStream = new FileInputStream(new File(filepath1));
FileOutputStream fileOutputStream = new FileOutputStream(new File(filepath2));
FileChannel fileInputStreamChannel = fileInputStream.getChannel();
FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
//将目标的通道的数据复制到当前通道,Channel自带数据有效长度size获取
fileOutputStreamChannel.transferFrom(fileInputStreamChannel, 0, fileInputStreamChannel.size());
fileInputStreamChannel.close();
fileOutputStreamChannel.close();
fileInputStream.close();
fileOutputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
利用transferFrom拷贝Image(Kotlin):
fun main(args: Array):Unit { val imagePath = "src/main/resources/filedata/sky.jpg"; val copyImageToPath = "src/main/resources/filedata/sky_copy.jpg" val accessFile = RandomAccessFile(imagePath, "rw") val accessFile_copy = RandomAccessFile(copyImageToPath, "rw") val accessFile_channle = accessFile.channel val accesssFile_copy_channel = accessFile_copy.channel accesssFile_copy_channel.transferFrom(accessFile_channle,0,accessFile_channle.size()) }
Buffer和Channel的注意事项:
NIO案例- 实例代码:
- NioServer.java
package bigdata.studynio;
public class NioServer {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length < 0){
//port = Integer.valueOf(args[0]);
}
//创建服务器线程
NioServerWork nioServerWork = new NioServerWork(port);
new Thread(nioServerWork, "server").start();
}
}
NioServerWork.java
package bigdata.studynio;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioServerWork implements Runnable {
//多路复用器 Selector会对注册在其上面的channel进行;轮询,当某个channel发生读写操作时,
//就会处于相应的就绪状态,通过SelectionKey的值急性IO 操作
private Selector selector;//多路复用器
private ServerSocketChannel channel;
private volatile boolean stop;
public NioServerWork(int port) {
try {
selector = Selector.open();//打开多路复用器
channel = ServerSocketChannel.open();//打开socketchannel
channel.configureBlocking(false);//配置通道为非阻塞的状态
channel.socket().bind(new InetSocketAddress(port), 1024);//通道socket绑定地址和端口
channel.register(selector, SelectionKey.OP_ACCEPT);//将通道channel在多路复用器selector上注册为接收操作
System.out.println("NIO 服务启动 端口: "+ port);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void stop(){
this.stop=true;
}
@Override
public void run() {//线程的Runnable程序
System.out.println("NIO 服务 run()");
while(!stop){
try {
selector.select(1000);//最大阻塞时间1s
//获取多路复用器的事件值SelectionKey,并存放在迭代器中
Set selectedKeys = selector.selectedKeys();
Iterator iterator = selectedKeys.iterator();
SelectionKey key =null;
//System.out.println("NIO 服务 try");
while(iterator.hasNext()){
System.out.println("NIO 服务 iterator.hasNext()");
key = iterator.next();
iterator.remove();//获取后冲迭代器中删除此值
try {
handleinput(key);//根据SelectionKey的值进行相应的读写操作
} catch (Exception e) {
if(key!=null){
key.cancel();
if(key.channel()!=null)
key.channel().close();
}
}
}
} catch (IOException e) {
System.out.println("NIO 服务 run catch IOException");
e.printStackTrace();
System.exit(1);
}
}
}
private void handleinput(SelectionKey key) throws IOException {
System.out.println("NIO 服务 handleinput");
if(key.isValid()){//判断所传的SelectionKey值是否可用
if(key.isAcceptable()){//在构造函数中注册的key值为OP_ACCEPT,,在判断是否为接收操作
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();//获取key值所对应的channel
SocketChannel sc = ssc.accept();//设置为接收非阻塞通道
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);//并把这个通道注册为OP_READ
}
if(key.isReadable()){//判断所传的SelectionKey值是否为OP_READ,通过上面的注册后,经过轮询后就会是此操作
SocketChannel sc = (SocketChannel)key.channel();//获取key对应的channel
ByteBuffer readbuf = ByteBuffer.allocate(1024);
int readbytes = sc.read(readbuf);//从channel中读取byte数据并存放readbuf
if(readbytes > 0){
readbuf.flip();//检测时候为完整的内容,若不是则返回完整的
byte[] bytes = new byte[readbuf.remaining()];
readbuf.get(bytes);
String string = new String(bytes, "UTF-8");//把读取的数据转换成string
System.out.println("服务器接受到命令 :"+ string);
//"查询时间"就是读取的命令,此字符串要与客户端发送的一致,才能获取当前时间,否则就是bad order
String currenttime = "查询时间".equalsIgnoreCase(string) ? new java.util.Date(System.currentTimeMillis()).toString() : "bad order";
dowrite(sc,currenttime);//获取到当前时间后,就需要把当前时间的字符串发送出去
}else if (readbytes < 0){
key.cancel();
sc.close();
}else{}
}
}
}
private void dowrite(SocketChannel sc, String currenttime) throws IOException {
System.out.println("服务器 dowrite currenttime"+ currenttime);
if(currenttime !=null && currenttime.trim().length()>0){
byte[] bytes = currenttime.getBytes();//将当前时间序列化
ByteBuffer writebuf = ByteBuffer.allocate(bytes.length);
writebuf.put(bytes);//将序列化的内容写入分配的内存
writebuf.flip();
sc.write(writebuf); //将此内容写入通道
}
}
}
NioClient.java
package bigdata.studynio;
public class NioClient {
public static void main(String[] args) {
int port = 8080;
if(args !=null && args.length > 0){
try {
//port = Integer.valueOf(args[0]);
} catch (Exception e) {
// TODO: handle exception
}
}
//创建客户端线程
new Thread(new NioClientWork("127.0.0.1",port),"client").start();
}
}
NioClientWork.java
package bigdata.studynio;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioClientWork implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public NioClientWork(String string, int port) {
this.host = string == null ? "127.0.0.1":string;
this.port = port;
try {
selector= Selector.open();//打开多路复用器
socketChannel=SocketChannel.open();//打开socketchannel
socketChannel.configureBlocking(false);
System.out.println("NIO 客户端启动 端口: "+ port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();//客户端线程需要连接服务器
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while(!stop){
try {
selector.select(1000);//最大阻塞时间1s
//获取多路复用器的事件值SelectionKey,并存放在迭代器中
Set selectedKeys = selector.selectedKeys();
Iterator iterator = selectedKeys.iterator();
SelectionKey key =null;
while (iterator.hasNext()) {
key = iterator.next();
iterator.remove();
try {
handleinput(key);//获取多路复用器的事件值SelectionKey,并存放在迭代器中
} catch (Exception e) {
if(key == null){
key.cancel();
if(socketChannel ==null)
socketChannel.close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
if(selector !=null){
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void doConnect() throws IOException {
if(socketChannel.connect(new InetSocketAddress(host, port))){//检测通道是否连接到服务器
System.out.println("NIO 客户端 idoConnect OP_READ ");
socketChannel.register(selector, SelectionKey.OP_READ);//如果已经连接到了服务器,就把通道在selector注册为OP_READ
dowrite(socketChannel);
}else{
System.out.println("NIO 客户端 doConnect OP_CONNECT ");
socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果客户端未连接到服务器,则将通道注册为OP_CONNECT操作
}
}
private void handleinput(SelectionKey key) throws IOException {
//System.out.println("NIO 客户端 handleinput ");
if(key.isValid()){//判断所传的SelectionKey值是否可用
//System.out.println("NIO 客户端 isValid() ");
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){//一开始的时候,客户端需要连接服务器操作,所以检测是否为连接状态
System.out.println("NIO 客户端 isConnectable ");
if(sc.finishConnect()){//是否完成连接
System.out.println("NIO 客户端 finishConnect ");
dowrite(sc);//向通道内发送数据,就是“查询时间” 的命令,读写通道与通道注册事件类型无关,注册事件只是当有事件来了,就会去处理相应事件
sc.register(selector, SelectionKey.OP_READ);//如果完成了连接,就把通道注册为 OP_READ操作,用于接收服务器出过来的数据
}else{
System.out.println("NIO 客户端 not finishConnect ");
System.exit(1);
}
}
if(key.isReadable()){//根据上面注册的selector的通道读事件,进行操作
System.out.println("NIO 客户端 isReadable() ");
ByteBuffer readbuf = ByteBuffer.allocate(1024);
int readbytes = sc.read(readbuf);//获取通道从服务器发过来的数据,并反序列化
if(readbytes > 0){
readbuf.flip();
byte[] bytes=new byte[readbuf.remaining()];
readbuf.get(bytes);
String string = new String(bytes, "UTF-8");
System.out.println("时间是: " + string);
this.stop=true; //操作完毕后,关闭所有的操作
}else if (readbytes < 0){
key.cancel();
sc.close();
}else{}
}
}
}
private void dowrite(SocketChannel sc) throws IOException {
byte[] req = "查询时间".getBytes();
ByteBuffer writebuf = ByteBuffer.allocate(req.length);
writebuf.put(req);
writebuf.flip();
sc.write(writebuf);
if(!writebuf.hasRemaining()){
System.out.println("向服务器发送命令成功 ");
}
}
}
asReadOnlyBuffer例子:
public class NioBufferOnlyRead {
public static void main(String[] args) {
ByteBuffer dataBuffer = ByteBuffer.allocate(5);
for (int i = 0; i < dataBuffer.capacity() - 1; i++) {
dataBuffer.put((byte) (i * 2));
}
dataBuffer.flip();
//可以从一个创建的Buffer获取OnlyReadBuffer
ByteBuffer onlyByteBuffer = dataBuffer.asReadOnlyBuffer();
while (onlyByteBuffer.hasRemaining()) {
System.out.println(onlyByteBuffer.get());
}
//无法往OnlyReadBuffer写数据
// onlyByteBuffer.put((byte)(2));
//可以往dataBuffer中写数据
dataBuffer.put((byte) (2));
}
}
- 指定Buffer的读写操作,会生成一个新的Buffer
- MappedByteBuffer:可以实现文件在内存(堆外内存)直接修改,而操作系统无需再拷贝一份,实例代码:
public class NioBufferAsReadOnlyBuffer {
public static void main(String[] args) {
try {
RandomAccessFile randomAccessFile = new RandomAccessFile("src/main/resources/filedata/data1.txt", "rw");
FileChannel randomAccessFileChannel = randomAccessFile.getChannel();
MappedByteBuffer map = randomAccessFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, randomAccessFileChannel.size());
map.put(0, (byte) ('H'));
randomAccessFile.close();
randomAccessFileChannel.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
RandomAccessFile处理文本,mode的源码:
String name = (file != null ? file.getPath() : null);
int imode = -1;
if (mode.equals("r"))
imode = O_RDONLY;
else if (mode.startsWith("rw")) {
imode = O_RDWR;
rw = true;
if (mode.length() > 2) {
if (mode.equals("rws"))
imode |= O_SYNC;
else if (mode.equals("rwd"))
imode |= O_DSYNC;
else
imode = -1;
}
由源码可知mode的四种模式对应的功能如下:
MappedByteBuffer属性解析:
public abstract MappedByteBuffer map(MapMode mode,
long position, long size)
throws IOException;
mode的三种模式:
public static final MapMode READ_ONLY
= new MapMode("READ_ONLY");
public static final MapMode READ_WRITE
= new MapMode("READ_WRITE");
public static final MapMode PRIVATE
= new MapMode("PRIVATE");
多路复用
- Selector(多路复用器)
- Java的Nio,用到非阻塞的IO方式。可以用一个线程,处理多个客户端连接,就会使用到Selector(选择器)
- Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式注册到同一个Selector),如果有事件发生,便获取时间让后针对每个事件进行相应的处理,这样就可以只用一个单线程区管理多个通道,也就是管理多个连接和请求
- Selector相关方法
//实现Closeable接口表示拥有自动关闭流的功能
public abstract class Selector implements Closeable {
//表示获得一个Selector实例
public static Selector open() throws IOException {
//表示设置超时时间,非阻塞,当有事件发生的时候将将注册到相应的SelectionLey中
public abstract int select(long timeout)
//表示获得已经注册了所有的Selectkey
public abstract Set selectedKeys();
//阻塞的方法
public abstract int select() throws IOException;
//在未超过select(long timeout)的范围中,可以wakeup唤醒selector
public abstract Selector wakeup();
//不阻塞,立即返回
public abstract int selectNow() throws IOException;
- 获得了SelectionKey相当于获得了对应的Channel,SelectionKey和Channel之间是注册关系
- Selectort、SelectionKey、ServerSocketChannel、SocketChannel
对上图的说明:
1.当客户端连接时,会通过ServerSocketChannel得到SocketChannel 2.将socketChannel注册到Selector上,register(Selector sel, int ops)一个selector上可以注册多个SocketChannel 3.注册后返回一个SelectionKey,会和该Selector关联(集合) 4.Selector进行监听select方法,返回有事件发生的通道的个数 5.进一步得到各个SelectionKey(有事件发生) 6.在通过SelectionKey反向获取SocketChannel,方法channel() 7.可以通过得到channel,完成业务处理
使用三大核心组件编写Server-Client实现上图业务逻辑:
- Server(Java):
public class NioServer {
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(7798));
//设置未非阻塞
serverSocketChannel.configureBlocking(false);
//注册selector,设置关注事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
if (selector.select(1000) == 0) {
System.out.println("超时1s!");
continue;
}
//selector>0表示触发了关注事件
Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
iterator.forEachRemaining(s -> {
//监听连接事件
if (s.isAcceptable()) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
//非阻塞
socketChannel.configureBlocking(false);
System.out.println("一个客户端连接成功"+socketChannel.hashCode());
socketChannel.register(selector, OP_READ, ByteBuffer.allocate(1024));
} catch (IOException e) {
e.printStackTrace();
}
}
//监听读事件
if (s.isReadable()) {
SocketChannel socketChannel = (SocketChannel)s.channel();
ByteBuffer dataBuffer = (ByteBuffer)s.attachment();
try {
socketChannel.read(dataBuffer);
System.out.println("来自客户端:"+new String(dataBuffer.array()));
} catch (IOException e) {
e.printStackTrace();
}
dataBuffer.clear();
}
iterator.remove();
});
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client(Kotlin):
fun main(args: Array):Unit { val data = "我深深地熟悉你脚步的韵律,它在我心中敲击." val socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); val inetSocketAddress = InetSocketAddress("127.0.0.1",7798) if(!socketChannel.connect(inetSocketAddress)){ while (!socketChannel.finishConnect()){ System.out.println("连接服务器,需要时间....出去溜达会吧~") } } val dataBuffer = ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8)) socketChannel.write(dataBuffer) System.`in`.read() }
SelectionKey API
//注册channel对应的SelectionKey
public abstract class SelectionKey {
//得到与之与之关联的selector
public abstract Selector selector();
//得到与之关联的channel
public abstract SelectableChannel channel();
//是否可以读
public final boolean isReadable()
//是否可以写
public final boolean isWritable()
//是否可以accept
public final boolean isAcceptable()
//获得与之关联的数据
public final Object attachment()
//改变监听事件
public abstract SelectionKey interestOps(int ops);
SelectionKey的四个重要属性,<<表示位运算向左移动
public static final int OP_READ = 1 << 0;//1 读操作 public static final int OP_WRITE = 1 << 2;//4 写操作 public static final int OP_CONNECT = 1 << 3;//8 已经连接 public static final int OP_ACCEPT = 1 << 4;//16 有新的网络可以accept
SocketChannel API
- SocketChannel,网络IO通道,具体负责进行读写操作,NIO把缓冲区的数据写入通道,或者把通道的数据读到缓冲区
public abstract class SocketChannel
extends AbstractSelectableChannel
implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
{
//获得一个SocketChannel通道
public static SocketChannel open() throws IOException {
//设置阻塞或非阻塞
public abstract boolean isConnectionPending();
//连接服务器
public abstract boolean connect(SocketAddress remote) throws IOException;
//如果connect方法连接失败,接下来通过该方法进行完成连接操作
public abstract boolean finishConnect() throws IOException;
//往通道内写数据
public abstract int write(ByteBuffer src) throws IOException;
//往通道里读数据
public abstract int read(ByteBuffer dst) throws IOException;
//注册一个选择器斌设置监听事件。最后一个参数可以设置共享数据
public final SelectionKey register(Selector sel, int ops,
Object att)
多人聊天室(NIO)
- 多人聊天室
- Server
public class GroupToChatWithServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private static final int Port = 7799;
public GroupToChatWithServer() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(Port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() {
try {
while (true) {
int status = selector.select();
if (status > 0) {
Set selectionKeys = selector.selectedKeys();
Iterator key_Iterator = selectionKeys.iterator();
key_Iterator.forEachRemaining(s -> {
if (s.isAcceptable()) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(socketChannel.getRemoteAddress()+"上线了");
} catch (IOException e) {
e.printStackTrace();
}
}
if(s.isReadable()){
readData(s);
}
key_Iterator.remove();
});
} else {
// System.out.println("等待中....");
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
private void readData(SelectionKey key){
SocketChannel socketChannel = null;
try{
socketChannel = (SocketChannel) key.channel();
ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
int readCount = socketChannel.read(dataBuffer);
if (readCount>0){
String msg = new String(dataBuffer.array());
System.out.println("from 客户端"+msg);
sendInfo(msg,socketChannel);
}
}catch (Exception e){
try {
System.out.println(socketChannel.getRemoteAddress()+"下线了");
key.cancel();
socketChannel.close();
} catch (IOException e2) {
e2.printStackTrace();
}
}
}
private void sendInfo(String msg, SocketChannel self){
System.out.println("服务器发送信息中...");
selector.selectedKeys().stream().forEach(s->{
Channel targetChannel = s.channel();
//排除自己发送
if (targetChannel instanceof SocketChannel && targetChannel != self){
SocketChannel dest = (SocketChannel)targetChannel;
ByteBuffer dataBuffer = ByteBuffer.wrap(msg.getBytes());
try {
dest.write(dataBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) {
GroupToChatWithServer chatWithServer = new GroupToChatWithServer();
chatWithServer.listen();
}
}
Client
public class GroupClient {
private final String host = "127.0.0.1";
private final int port = 7799;
private Selector selector;
private SocketChannel socketChannel;
private String username;
public GroupClient() {
try {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(host,port));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username+"is ok!!!");
} catch (IOException e) {
e.printStackTrace();
}
}
public void sendInfo(String info){
info = username+"说:"+info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
public void readInfo(){
try {
int status = selector.select();
if (status>0){
Iterator key_iterator = selector.selectedKeys().iterator();
while (key_iterator.hasNext()) {
SelectionKey s = key_iterator.next();
if (s.isReadable()) {
SocketChannel socketChannel = (SocketChannel) s.channel();
ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
try {
socketChannel.read(dataBuffer);
String msg = new String(dataBuffer.array());
System.out.println(msg.trim());
} catch (IOException e) {
e.printStackTrace();
}
}
}
key_iterator.remove();
}else {
// System.out.println("没有可用的通道...");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
GroupClient chatClient = new GroupClient();
new Thread(){
@Override
public void run() {
while (true){
chatClient.readInfo();
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
//发送给服务端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String str = scanner.nextLine();
chatClient.sendInfo(str);
}
}
}
零拷贝
- 零拷贝:是指没有CPU拷贝
- 实在网络传输优化的重要手段,从OS角度来看,内核缓冲区只有唯一的一份数据,不存在重复,更少使用CPU缓存伪共享以及无CPU校验和计算
常用两种零拷贝mmap(内存映射)和sendFile
-
- 传统io
- 三次上下文切换
- Hard driver→kernel buffer→user buffer→socket buffer→protocol engine
- kernel buffer、user buffer、socket buffer(两次cpu拷贝)
mmao优化
- 两次上下文切换
- Hard driver→kernel buffer→socket buffer→protocol engine
- mmap直接将Hard driver文件映射到kernel buffer上,kernel buffer和user buffer共享数据,通过直接操作kernel buffer(内核内存)数据,实现文件的操作
- kernel buffer、socket buffer(一次cpu拷贝)
sendFile
Linux2.1
- 两次上文切换
- Hard driver→kernel buffer→socket buffer→protocol engine
- kernel buffer 、socket buffer(一次cpu拷贝(拷贝的是元数据,比如:数据长度等))
- CPU copy:cpu拷贝,DMA copy:内存拷贝
- Linux2.4
- Hard driver →kernel buffer—→(socket buffer(copy kernel buffer元数据,比如lenght等,可以忽略))→protocol engine
- kernel buffer 可以直接通过内存拷贝到协议栈protocol engin,但是还是有少量数据需要cpu copy到socket buffer上
使用.transferTo零拷贝传输文件
Server
public class NioServer implements Serializable {
private static final String host = "127.0.0.1";
private static final int port = 8899;
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(host, port));
ByteBuffer dataBuffer = ByteBuffer.allocate(4098);
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
int readCount = 0;
while (-1 != readCount) {
try {
//统计读取字节
readCount = socketChannel.read(dataBuffer);
}catch (IOException e){
break;
}
//可以让dataBuffer复用,使position = 0;mark = -1作废
dataBuffer.rewind();
}
}
}
}
- .rewind:表示buffer的倒带,也就是position=0,mark标志位无效
Client:
public class NioClient implements Serializable {
private static final String fileName = "protoc-3.6.1-win32.zip";
private static final String host = "127.0.0.1";
private static final int port = 8899;
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
//客户端使用connect连接
socketChannel.connect(new InetSocketAddress(host,port));
//文件传输通道
FileChannel fileChannel = new FileInputStream(new File(fileName)).getChannel();
long start = System.currentTimeMillis();
long byteNum = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
System.out.println("传输的字节数:"+byteNum+"耗时:"+(System.currentTimeMillis()-start));
fileChannel.close();
}
}



