栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

多线程改进Selector

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

多线程改进Selector

前面的代码在服务器端都是单线程配合一个 Selector 选择器来管理多个 channel 上的事件

这样有两个缺点

  1. 现在都是多核CPU,一个线程只能用一个核心,其他的核心会白白浪费
  2. 单线程是可以处理多个事件,但是如果某个事件耗时较长,就会造成其他事件的等待

如果让你设计一种较为合理的架构,你会怎样设计呢?

首先一点要把多核CPU 充分利用起来,第二就是每个线程对应自己的职责,例如,店小二负责接待,厨师负责炒菜,服务员负责记录菜单

代码实现

服务器

package com.zhao.c1;

import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;



@Slf4j
public class MultiThreadServer {

    public static void main(String[] args) throws IOException {
        // 设置main 名称为 Boss
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();

        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));

        // 1. 创建固定数量的 work 并初始化
        Work work = new Work("work0");
        work.register();
        while (true) {

            // 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞
            boss.select();
            Iterator iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    // accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}",sc.getRemoteAddress());
                    // 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)
                    // 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selector
                    log.debug("before register...{}",sc.getRemoteAddress());
                    sc.register(work.selector,SelectionKey.OP_READ,null);
                    log.debug("after register...{}",sc.getRemoteAddress());
                }

            }
        }
    }

    static class Work implements Runnable {
        private Thread thread;   // 独立的线程
        private Selector selector;  // 独立的监听器
        private String name; // work  的名字

        private volatile boolean start; // 刚开始线程还未初始化

        public Work(String name) {
            this.name = name;
        }

        // 初始化 Thread 和 Selector
        public void register() throws IOException {
            if (!start) {
                thread = new Thread(this, name);  // 创建一个新线程,就是当前类
                thread.start();
                selector = Selector.open();
                start = true;
            }
        }

        @Override
        public void run() {
            while (true){
                try {
                    selector.select();
                    Iterator iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()){
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isReadable()){
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            // 读取到了 channel 中的数据
                            log.debug("read...{}",channel.getRemoteAddress());
                            channel.read(buffer);
                            // 切换至读模式
                            buffer.flip();
                            ByteBufferUtil.debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

客户端

package com.zhao.c1;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;


public class TestClient {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));
        sc.write(Charset.defaultCharset().encode("1234567890abcdef"));
        System.in.read();
    }

}

一切看似风平浪静,但是客户端已经向服务器发送了数据,服务器却没有打印,只显示了连接

原因是线程的先后顺序不一致导致的

注册事件

阻塞事件 

 如果先发生注册事件,再发生阻塞事件,客户端此时发送消息过来,run 方法中是不会再发生阻塞,会把这个事件处理完毕。

如果先发生阻塞事件,再发生注册事件,客户端此时发送消息过来,你的 read 事件还没注册到 Selector 上面,selector.select()会认为你现在没有事件要处理,会一直阻塞。

这个解决起来稍微有一点复杂,因为你想让注册事件发生在阻塞事件前面,阻塞事件是在另一个线程里边,所以得想办法把注册事件也放在这个线程里边,并且先于阻塞事件发生。

这可以通过一个队列来解决,先把我要做的事情存放到一个消息队列里边,执行另一个线程的 run 方法之前,先执行我消息队列的事件,这样一来就解决了 阻塞事件阻塞注册事件的问题。

package com.zhao.c1;

import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentlinkedDeque;
import java.util.concurrent.ConcurrentlinkedQueue;



@Slf4j
public class MultiThreadServer {

    public static void main(String[] args) throws IOException {
        // 设置main 名称为 Boss
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();

        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));

        // 1. 创建固定数量的 work 并初始化
        Work work = new Work("work0");
        while (true) {

            // 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞
            boss.select();
            Iterator iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    // accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}", sc.getRemoteAddress());
                    // 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)
                    // 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selector
                    log.debug("before register...{}", sc.getRemoteAddress());
                    work.register(sc);
                    log.debug("after register...{}", sc.getRemoteAddress());
                }

            }
        }
    }

    static class Work implements Runnable {
        private Thread thread;   // 独立的线程
        private Selector selector;  // 独立的监听器
        private String name; // work  的名字

        private volatile boolean start; // 刚开始线程还未初始化

        private ConcurrentlinkedQueue queue = new ConcurrentlinkedQueue<>();  // 两个线程通道

        public Work(String name) {
            this.name = name;
        }

        // 初始化 Thread 和 Selector
        public void register(SocketChannel sc) throws IOException {
            if (!start) {
                thread = new Thread(this, name);  // 创建一个新线程,就是当前类
                thread.start();
                selector = Selector.open();
                start = true;
            }
            queue.add(() -> {
                try {
                    sc.register(selector, SelectionKey.OP_READ, null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });

            selector.wakeup();  // 通知阻塞放行
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select(); // work0,阻塞,使用 wakeup 唤醒

                    Runnable task = queue.poll();
                    if (task != null) {
                        task.run();   // 执行了 sc.register(selector, SelectionKey.OP_READ, null);
                    }

                    Iterator iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            // 读取到了 channel 中的数据
                            log.debug("read...{}", channel.getRemoteAddress());
                            channel.read(buffer);
                            // 切换至读模式
                            buffer.flip();
                            ByteBufferUtil.debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

设置多个 Worker,Worker 的个数根据 Cpu 的核数定的,可以用轮询做到

具体实现

创建一个 Workers 数组,容量为 CPU 的核数

创建一个原子 Integer 类

获得workers 数组中的 其中一个元素 = index.getAndIncrement() % works.length

package com.zhao.c1;

import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentlinkedDeque;
import java.util.concurrent.ConcurrentlinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;



@Slf4j
public class MultiThreadServer {

    public static void main(String[] args) throws IOException {
        // 设置main 名称为 Boss
        Thread.currentThread().setName("boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();

        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));

        Work[] works = new Work[Runtime.getRuntime().availableProcessors()];
        for (int i = 0; i < works.length; i++) {
            works[i] = new Work("worker- "+i);
        }

        AtomicInteger index = new AtomicInteger();

        // 1. 创建固定数量的 work 并初始化
        while (true) {

            // 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞
            boss.select();
            Iterator iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    // accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}", sc.getRemoteAddress());
                    // 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)
                    // 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selector
                    log.debug("before register...{}", sc.getRemoteAddress());

                    // round robin 轮询
                    works[index.getAndIncrement() % works.length].register(sc);
                    log.debug("after register...{}", sc.getRemoteAddress());
                }

            }
        }
    }

    static class Work implements Runnable {
        private Thread thread;   // 独立的线程
        private Selector selector;  // 独立的监听器
        private String name; // work  的名字

        private volatile boolean start; // 刚开始线程还未初始化

        private ConcurrentlinkedQueue queue = new ConcurrentlinkedQueue<>();  // 两个线程通道

        public Work(String name) {
            this.name = name;
        }

        // 初始化 Thread 和 Selector
        public void register(SocketChannel sc) throws IOException {
            if (!start) {
                thread = new Thread(this, name);  // 创建一个新线程,就是当前类
                thread.start();
                selector = Selector.open();
                start = true;
            }
            queue.add(() -> {
                try {
                    sc.register(selector, SelectionKey.OP_READ, null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });

            selector.wakeup();  // 通知阻塞放行
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select(); // work0,阻塞,使用 wakeup 唤醒

                    Runnable task = queue.poll();
                    if (task != null) {
                        task.run();   // 执行了 sc.register(selector, SelectionKey.OP_READ, null);
                    }

                    Iterator iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            // 读取到了 channel 中的数据
                            log.debug("read...{}", channel.getRemoteAddress());
                            channel.read(buffer);
                            // 切换至读模式
                            buffer.flip();
                            ByteBufferUtil.debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

NIO vs BIO stream vs channel
  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)

  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用

  • 二者均为全双工,即读写可以同时进行

IO 模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞 

当一个应用程序调用了 read 方法引发的事件

阻塞IO

一旦等待数据处发生阻塞,这个线程就什么也做不了了,只能老老实实的等待数据。不光用户态要等,内核态也要等。就像是一个单道环,没有回头路。

张三去买一包 程序员安慰剂,到了药店,药店老板说要等一阵子才有货,等一天或者等一年,张三都在等,一直等到货来了,买到了,才回去。

非阻塞IO

一旦用户态发现内核态没有数据,就会立刻返回。下一次 where(true) 循环的时候,再去查看有无数据,一旦有数据了,就不会立刻返回。而是复制数据并返回。

张三去买一包 程序员安慰剂,到了药店,药店老板说要等一阵子才有货,张三一听转身就走,第二次,张三又去了,老板说有货,张三就买到货并返回。张三买货的期间是不能做其他事情的

多路复用

用户态调用 select 方法,查看内核是否有事件,没有事件则阻塞,有事件内核会通知用户态有新事件发生。用户线程就会调用 read 方法进行读取,需要复制数据,就会发生阻塞

 张三去买一包 程序员安慰剂,但是他不清楚药店有没有货,就打电话给药店老板,老板说现在没有,等有了我告诉你。过了一两天,有货了,老板打电话给张三,让他来拿货。于是张三过去买了一包又回来了。

与阻塞IO不同的是,多路复用每次获取的是一批事件。

例如:张三需要吃饭,吃完饭后还要写代码,写完代码后又要去吃饭。这之中发生了等待时间就只能在这里等待了。例如,去吃饭饭店没菜了,厨子去买菜,饭什么时候可以做好,谁也说不清楚。写代码,停电了,什么时候来电,谁也不知道。再次期间就只能干等。多路复用则不同,没菜了?我不去,等你有菜了,来电了我再做事情,例如,饭店打电话说现在没菜了,那张三就会呆在家,哪里也不去,公司打电话说断电了,张三也哪里都不会去。直到饭店说买到菜了,公司说来电了,张三才会起身去吃饭,去公司上班。他获取的是一批事件,不会陷入某一个事件中等待。

通过上面的案例,可以得出一个结论,就是当用户线程去处理请求,说明此时已经有一个或多个请求要处理了,等待的时间已经在 select 处等待过了,一次性的处理多个事件。

同步:线程自己去获取结果(一个线程) 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

阻塞IO :自己调用read 方法,自己接收请求。同步阻塞

阻塞IO :自己调用read 方法,自己接收请求。同步非阻塞

多路复用:自己调用read 方法,自己接收请求。同步

异步非阻塞:当前线程调用完 Thread1 后会向下继续运行,不会阻塞

张三想买一包 程序员安慰剂,但自己又很忙,于是他叫来自己的小弟李四去买,张三去上班了,李四买完药后,回到家,拿给张三。

package com.zhao.c1;

import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;



@Slf4j
public class AioFileChannel {

    public static void main(String[] args) throws IOException {
        try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {

            // 参数1:ByteBuffer
            // 参数2:读取的起始位置
            // 参数3:附件
            // 参数4:回调对象
            ByteBuffer buffer = ByteBuffer.allocate(16);

            log.debug("read begin");
            channel.read(buffer, 0, buffer, new CompletionHandler() {

                // read 成功调用此方法
                
                @Override
                public void completed(Integer result, ByteBuffer attachment) {

                    log.debug("read completed");
                    attachment.flip();
                    ByteBufferUtil.debugAll(attachment);
                }

                // read 失败调用此方法
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    exc.printStackTrace();
                }
            });
            log.debug("read end");
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.in.read();

    }

}

主线程打印完 read begin 和 read end 后,Thread 1 才开始运行.

零拷贝
package com.zhao.c1;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;

public class ClientDemo {
    public static void main(String[] args) throws IOException {
        //创建客户端的Socket对象(Socket)
        //Socket(String host, int port) 创建流套接字并将其连接到指定主机上的指定端口号
        Socket socket = new Socket("192.168.1.66",10000);

        File f = new File("data.txt");
        RandomAccessFile file = new RandomAccessFile(f,"r");

        byte[] buf = new byte[(int)f.length()];

        file.read(buf);

        //获取输出流,写数据
        socket.getOutputStream().write(buf);

        //释放资源
        socket.close();
    }
}

以上程序用户态和内核态之间的转换如下图

程序调用read 方法后,会有用户态转换成内核态。方法调用结束会有内核态转换为用户态,返回结果。会发生 1,2步骤的内容

程序调用 write 方法,会有用户态转换为内核态,会发生 3,4步骤的内容

程序运行,会发生3次 用户态与内核态的转换,4次 数据复制

二次优化

在 Netty基础 NIO中有介绍过使用 ByteBuffer.allocateDirect(16),创建的 buffer 使用的是直接内存,会减少一次拷贝。

buffer使用的是操作系统中的内存,当磁盘想缓冲区写入时,byte 数组和内核缓冲区用的是同一块内存。减少了一次拷贝

三次优化

无论是使用原始读取还是使用直接内存的 ByteBuffer,都绕不开java程序,linux 2.1 后提供的 sendFile 方法 ,java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

现在只需要 一次用户态到内核态的切换,是在  transferTo 方法发生后,3次 数据复制

四次优化

linux 2.4

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

零拷贝,不需要在java 层面发生拷贝,拷贝只发生在网络与操作系统之间

优点和适用场景

  • 更少的用户态与内核态的切换

  • 不利用 cpu 计算,减少 cpu 缓存伪共享

  • 零拷贝适合小文件传输

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/435047.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号