- Reactor模型是相对传统IO机构来说的,也就是NIO模型, NIO模型之所以可以优化,得益于它是基于事件,基于异步,不像传统IO,是阻塞的Reactor模型分为几个组件,分别是Reactor、Acceptor、HandlerReactor组件负责分发事件,如果是连接那么交给Acceptor;如果是读写事件那么交给HandlerAcceptor负责处理连接事件(获取新连接,注册到Selector上,注册读写事件,绑定Handler)Handler负责处理读写事件(使用channel进行读写)
大致模型如下:
方案优缺点分析优点:模型简单,没有多线程、进程通信。竞争的问题,全部都在一个线程中完成
缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能,Handler 在处理某个连接上的业务时,整个进程无法处理其它连接事件,很容易导致性能瓶颈;可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度为 O(1) 的情况
工作原理示意图 实现客户端发送数据代码:
package com.company;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
public class ReactorClient {
public void start() throws IOException, InterruptedException {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(12345));
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//不断的自旋、等待连接完成
while (!socketChannel.finishConnect()) {
// do
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
//发送数据到服务器
for (int i = 0; i < 100; i++) {
String data = "hello zhe " + i + " --- " + LocalDateTime.now();
System.out.println(data);
//buffer写模式
buffer.put(data.getBytes());
//buffer读模式
buffer.flip();
socketChannel.write(buffer);
//buffer切换写模式
buffer.clear();
Thread.sleep(5000);
}
//关闭
socketChannel.shutdownOutput();
socketChannel.close();
}
public static void main(String[] args) throws IOException, InterruptedException {
new ReactorClient().start();
}
}
服务器代码
package com.company;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class ReactorServer implements Runnable {
Selector selector;
ServerSocketChannel serverSocketChannel;
public ReactorServer() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(12345));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //接收accept事件
System.out.println("ReactorServer初始化成功 - 1");
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selectionKeys = selector.selectedKeys();
Iterator keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isAcceptable()) {
new ReactorAcceptor(selector, serverSocketChannel).run();
} else if (selectionKey.isReadable()) {
new ReactorHandler(selectionKey).run();
}
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
Thread thread = new Thread(new ReactorServer());
thread.start();
}
}
class ReactorAcceptor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
public ReactorAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("连接新注册:" + socketChannel.getRemoteAddress());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
class ReactorHandler implements Runnable {
SelectionKey selectionKey;
SocketChannel socketChannel;
public ReactorHandler(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
try {
if (selectionKey.isReadable()) {
readOperate();
} else {
//写操作
}
} catch (Exception e) {
closeChannel();
}
}
private synchronized void readOperate() throws IOException {
socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = socketChannel.read(buffer)) > 0) {
String str = new String(buffer.array(), 0, len);
System.out.println("读取字符串 " + str);
}
}
private void closeChannel() {
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e) {
System.out.println("close error");
}
System.out.println("关闭连接");
}
}
DEMO
Client:
Server:



