栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于Java Selector实现网络通信

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于Java Selector实现网络通信

基于Java Selector实现网络通信 服务端代码

整体思路

    启动一个ServerSocket,注册到Selector上无限轮询,从Selector上获取有事件的Socket根据事件Socket类型,进行accept或者read处理

代码

package zx.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;


public class SelectorServer {
    private static ServerSocketChannel serverSocketChannel;
    private static Selector selector;
    static int port = 8848;

    public static void main(String[] args) {
        start();
    }

    
    private static void start() {
        initServer();
        System.out.println("服务器启动 " + serverSocketChannel);
        try {
            while (true) {
                //获取当前selector上所有注册的socket的包装key,本质上代表的是注册的socket
                Set keys = selector.keys();
                System.out.println("当前注册的socket个数= " + keys.size());
                //获取有事件的socket
                while (selector.select(5000L) > 0) {
                    //获取有事件的socket
                    Set selectedKeys = selector.selectedKeys();
                    Iterator keyIterator = selectedKeys.iterator();
                    //遍历有事件的socket
                    while (keyIterator.hasNext()) {
                        SelectionKey selectedKey = keyIterator.next();
                        keyIterator.remove();
                        if (selectedKey.isAcceptable()) {
                            acceptHandler(selectedKey);
                        } else if (selectedKey.isReadable()) {
                            readHandler(selectedKey);
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    
    private static void initServer() {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            //创建一个selector
            selector = Selector.open();
            //将socket注册到selector上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    private static void readHandler(SelectionKey selectedKey) {
        //获取连接,将连接再次注册到selector中
        SocketChannel connect = (SocketChannel) selectedKey.channel();
        //连接对应的buffer也封装在key中
        ByteBuffer buffer = (ByteBuffer) selectedKey.attachment();
        buffer.clear();
        int read = 0;
        try {
            read = connect.read(buffer);
            while (read > 0) {
                buffer.flip();
                //把读到的数据再写回去
                while (buffer.hasRemaining()) {
                    connect.write(buffer);
                }
                buffer.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    
    private static void acceptHandler(SelectionKey selectedKey) {
        try {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectedKey.channel();
            SocketChannel connect = serverSocketChannel.accept();
            connect.configureBlocking(false);
            ByteBuffer byteBuffer = ByteBuffer.allocate(8092);
            //将当前连接注册到selector上,关注read事件
            connect.register(selector, SelectionKey.OP_READ, byteBuffer);
            System.out.println("建立新连接,完成注册 " + connect);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

底层系统调用分析

首先基于POLL实现,在启动时,指定使用POLL

-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider

使用strace可以追踪进程的系统调用,启动命令如下

javac SelectorServer.java
strace -ff -o server/out java -Djava.nio.channels.spi.SelectorProvid=sun.nio.ch.PollSelectorProvider SelectorServer

POLL模型主流程的系统调用如下

//创建socket、bind、listen、设置非阻塞
socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4;
bind(4, {sa_family=AF_INET, sin_port=htons(8848), sin_addr=inet_addr("0.0.0.0")}, 16) = 0;
listen(4, 50);
socket(PF_LOCAL, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0) = 3;

//执行poll调用,阻塞指定的时长,返回0,代表无事件,返回1,代表有事件
//fd=6是一个管道,先不管;fd=4,就是我们的server socket,POLLIN,代表监听连接的建立
poll([{fd=6, events=POLLIN}, {fd=4, events=POLLIN}], 2, 5000) = 0 (Timeout);
//当创建连接后,将连接也加入,则每次poll会多一个文件描述符
poll([{fd=6, events=POLLIN}, {fd=4, events=POLLIN}, {fd=8, events=POLLIN}], 3, 5000);
//得到连接,接收连接
accept(4, {sa_family=AF_INET, sin_port=htons(40076), sin_addr=inet_addr("127.0.0.1")}, [16]) = 9;

基于EPOLL实现,分析系统调用如下

//创建socket、bind、listen、设置非阻塞
socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4;
bind(4, {sa_family=AF_INET, sin_port=htons(8848), sin_addr=inet_addr("0.0.0.0")}, 16) = 0;
listen(4, 50);
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0;

//执行epoll_create,创建一个fd,epoll本质上是一个fd
epoll_create(256) = 8;
//执行epoll_ctl,将server socket添加到epoll监听上,监听EPOLLIN
epoll_ctl(8, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=1379023911806566404}}) = 0;
//执行epoll_wait,等待超时的时长,有连接返回1,无连接返回0
epoll_wait(8, {{EPOLLIN, {u32=4, u64=1379023911806566404}}}, 8192, 5000) = 1;
//得到连接,接收连接,设置非阻塞
accept(4, {sa_family=AF_INET, sin_port=htons(40076), sin_addr=inet_addr("127.0.0.1")}, [16]) = 9;
fcntl(9, F_SETFL, O_RDWR|O_NONBLOCK)    = 0;
//将得到的连接注册到epoll
epoll_ctl(8, EPOLL_CTL_ADD, 9, {EPOLLIN, {u32=9, u64=17698686218151133193}}) = 0;
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/709311.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号