BIO通信模型以经典的时间服务器为模型
首先,熟悉BIO的服务端通信模型:采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的一请求一应答通信模型。
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,由于线程是Java虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大。系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。
同步阻塞式I/O创建的 TimeServer源码分析 TimeServer (时间服务器)分别以服务端和客户端的源码进行分析
package bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class TimeServer {
public static void main(String[] args) {
int port = 8088;
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
System.out.println("The time server is start in port:" + port);
Socket socket;
while (true){
socket = serverSocket.accept();
new Thread(new TimeServerHandler(socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
System.out.println("the time server close");
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
TimeServer根据传入的参数设置监听端口,如果没有入参,使用默认值8080
通过构造函数创建ServerSocket,如果端口合法且没有被占用,服务端监听成功
通过一个无限循环来监听客户端的连接,如果没有客户端接入,
则主线程阻塞在ServerSocket的 accept 操作上。
启动TimeServer,通过JvisualVM打印线程堆栈,
我们可以发现主程序确实阻塞在accept操作上,
当有新的客户端接入的时候,执行代码
以Socket为参数构造TimeServerHandler对象,TimeServerHandler是一个 Runnable,
使用它为构造函数的参数创建一个新的客户端
线程处理这条Socket链路。下面我们继续分析TimeServerHandler的代码。
package bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;
public class TimeServerHandler implements Runnable{
private Socket socket;
public TimeServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String currentTime = null;
String body = null;
while (true) {
body = in.readLine();
if (body == null)
break;
System.out.println("The time server receive order : "+ body);
currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
? new Date(System.currentTimeMillis()).toString() : "BAO ORDER";
System.out.println(currentTime);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭输入流
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭输出流
if (out != null) {
out.close();
out = null;
}
// 关闭socket
if (this.socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
this.socket = null;
}
}
}
}
通过 BufferedReader读取一行,如果已经读到了输入流的尾部,则返回值为null,退出循环。如果读到了非空值,则对内容进行判断,如果请求消息为查询时间的指令"QUERY TIME ORDER",则获取当前最新的系统时间,通过PrintWriter 的 println函数发送给客户端,最后退出循环。释放输入流、输出流和Socket套接字句柄资源,最后线程自动销毁并被虚拟机回收。
同步阻塞式/O创建的TimeClient源码分析 TimeClient (时间客户端)客户端通过Socket创建,发送查询时间服务器的"QUERY TIME ORDER"指令,然后读取服务端的响应并将结果打印出来,随后关闭连接,释放资源,程序退出执行。
package bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class TimeClient {
public static void main(String[] args) {
int port = 8088;
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket( "127.0.0.1",port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("QUERY TIME ORDER");
System.out.println("Send order 2 server succeed");
String resp = in.readLine();
System.out.println("NOW is" +resp);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭输入流
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭输出流
if (out != null) {
out.close();
}
// 关闭socket
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
客户端通过PrintWriter向服务端发送"QUERY TIME ORDER"指令,
然后通过BufferedReader 的readLine读取响应并打印。
分别执行服务端和客户端,执行结果如下。
服务端执行结果如图
客户端执行结果如图
到此为止,同步阻塞式I/O开发的时间服务器程序已经讲解完毕。我们发现,BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程处理新接入的客户端链路,一个线程只能处理一个客户端连接。在高性能服务器应用领域,往往需要面向成千上万个客户端的并发连接,这种模型显然无法满足高性能、高并发接入的场景。
为了改进一线程一连接模型,后来又演进出了一种通过线程池或者消息队列实现1个或者多个线程处理N个客户端的模型,由于它的底层通信机制依然使用同步阻塞I/O,所以被称为“伪异步”。下面的章节我们就对伪异步代码进行分析,看看伪异步是否能够满足我们对高性能、高并发接入的诉求。
伪异步I/〇编程为了解决同步阻塞IO面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化——后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N。通过线程池可以灵活地调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。
下面,我们结合连接模型图和源码,对伪异步I/O进行分析,看它是否能够解决同步阻塞I/O面临的问题。
采用线程池和任务队列可以实现一种叫做伪异步的I/O通信框架,它的模型图如图2-5所示。
当有新的客户端接入时,将客户端的Socket封装成一个 Task(该任务实现 java.lang.Runnable接口)投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列和N个活跃线程,对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
package bio_plus;
import bio.TimeServerHandler;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class TimeServer {
public static void main(String[] args) {
int port = 8099;
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
System.out.println("The time server is start in port:" + port);
Socket socket = null;
// 多线程 创建I/O 任务线程池
TimeServerHandlerExecutePool timeServerHandlerExecutePool =
new TimeServerHandlerExecutePool(50, 10000);
while (true) {
socket = serverSocket.accept();
timeServerHandlerExecutePool.execute(new TimeServerHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
System.out.println("The time server close");
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
serverSocket = null;
}
}
}
}
伪异步I/O的主函数代码发生了变化,我们首先创建一个时间服务器处理类的线程池,当接收到新的客户端连接时,将请求Socket封装成一个 Task,然后调用线程池的execute方法执行,从而避免了每个请求接入都创建一个新的线程。
伪异步I/O的TimeServerHandlerExecutePoolpackage bio_plus;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TimeServerHandlerExecutePool {
private ExecutorService executor;
public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize));
}
public void execute(Runnable task) {
executor.execute(task);
}
}
TimeServerHandler
package bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;
public class TimeServerHandler implements Runnable{
private Socket socket;
public TimeServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String currentTime = null;
String body = null;
while (true) {
body = in.readLine();
if (body == null)
break;
System.out.println("The time server receive order : "+ body);
currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
? new Date(System.currentTimeMillis()).toString() : "BAO ORDER";
System.out.println(currentTime);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭输入流
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭输出流
if (out != null) {
out.close();
out = null;
}
// 关闭socket
if (this.socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
this.socket = null;
}
}
}
}
TimeClient
package bio_plus;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
public class TimeClient {
public static void main(String[] args) {
int port = 8099;
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket( "127.0.0.1",port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("QUERY TIME ORDER");
System.out.println("Send order 2 server succeed");
String resp = in.readLine();
System.out.println("NOW is" +resp);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭输入流
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭输出流
if (out != null) {
out.close();
}
// 关闭socket
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
由于线程池和消息队列都是有界的,因此,无论客户端并发连接数多大,它都不会导致线程个数过于膨胀或者内存溢出,相比于传统的一连接一线程模型,是一种改良。
由于客户端代码并没有改变,因此,我们直接运行服务端和客户端,执行结果如下。服务端运行结果如图所示。
服务端运行结果如图所示。
客户端运行结果如图所示。
伪异步IO通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。但是由于它底层的通信依然采用同步阻塞模型,因此无法从根本上解决问题。下个小节我们对伪异步I/O进行深入分析,找到它的弊端,然后看看NIO是如何从根本上解决这个问题的。



