栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

NIO 单线程Reactor模式 客户端+服务端

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

NIO 单线程Reactor模式 客户端+服务端

Reactor模式原理
    Reactor模型是相对传统IO机构来说的,也就是NIO模型, NIO模型之所以可以优化,得益于它是基于事件,基于异步,不像传统IO,是阻塞的Reactor模型分为几个组件,分别是Reactor、Acceptor、HandlerReactor组件负责分发事件,如果是连接那么交给Acceptor;如果是读写事件那么交给HandlerAcceptor负责处理连接事件(获取新连接,注册到Selector上,注册读写事件,绑定Handler)Handler负责处理读写事件(使用channel进行读写)

大致模型如下:

方案优缺点分析

优点:模型简单,没有多线程、进程通信。竞争的问题,全部都在一个线程中完成

缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能,Handler 在处理某个连接上的业务时,整个进程无法处理其它连接事件,很容易导致性能瓶颈;可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障

使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度为 O(1) 的情况

工作原理示意图

实现

客户端发送数据代码:

package com.company;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;


public class ReactorClient {
    public void start() throws IOException, InterruptedException {
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(12345));
        socketChannel.configureBlocking(false);
        socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);

        //不断的自旋、等待连接完成
        while (!socketChannel.finishConnect()) {
            // do
        }

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //发送数据到服务器
        for (int i = 0; i < 100; i++) {
            String data = "hello zhe " + i + " --- " + LocalDateTime.now();
            System.out.println(data);

            //buffer写模式
            buffer.put(data.getBytes());

            //buffer读模式
            buffer.flip();
            socketChannel.write(buffer);

            //buffer切换写模式
            buffer.clear();
            Thread.sleep(5000);
        }

        //关闭
        socketChannel.shutdownOutput();
        socketChannel.close();
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        new ReactorClient().start();
    }
}

服务器代码

package com.company;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;


public class ReactorServer implements Runnable {
    Selector selector;

    ServerSocketChannel serverSocketChannel;

    public ReactorServer() throws IOException {
        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(12345));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //接收accept事件

        System.out.println("ReactorServer初始化成功 - 1");
    }


    
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();

                Set selectionKeys = selector.selectedKeys();
                Iterator keyIterator = selectionKeys.iterator();

                while (keyIterator.hasNext()) {
                    SelectionKey selectionKey = keyIterator.next();

                    if (selectionKey.isAcceptable()) {
                        new ReactorAcceptor(selector, serverSocketChannel).run();
                    } else if (selectionKey.isReadable()) {
                        new ReactorHandler(selectionKey).run();
                    }

                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws IOException {
        Thread thread = new Thread(new ReactorServer());
        thread.start();
    }
}


class ReactorAcceptor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;

    public ReactorAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel != null) {
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);

                System.out.println("连接新注册:" + socketChannel.getRemoteAddress());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


class ReactorHandler implements Runnable {
    SelectionKey selectionKey;
    SocketChannel socketChannel;

    public ReactorHandler(SelectionKey selectionKey) {
        this.selectionKey = selectionKey;
    }

    @Override
    public void run() {
        try {
            if (selectionKey.isReadable()) {
                readOperate();
            } else {
                //写操作
            }
        } catch (Exception e) {
            closeChannel();
        }
    }

    
    private synchronized void readOperate() throws IOException {
        socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int len = 0;
        while ((len = socketChannel.read(buffer)) > 0) {
            String str = new String(buffer.array(), 0, len);
            System.out.println("读取字符串 " + str);
        }
    }

    
    private void closeChannel() {
        selectionKey.cancel();
        try {
            socketChannel.close();
        } catch (IOException e) {
            System.out.println("close error");
        }
        System.out.println("关闭连接");
    }

}
DEMO

Client:

Server:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/777464.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号