栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

网络编程IO复用方法

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

网络编程IO复用方法

文章目录
      • 一、select
        • 1. select接口
        • 2. select完整代码
      • 二、Poll
        • 1. poll接口
        • 2. poll支持的事件类型
        • 3. poll完整代码
      • 三、epoll
        • 1. epoll接口
        • 2. epoll完整代码
      • 四、LT和ET模式
        • 1. LT和ET的基本概念
        • 2. ET的epoll服务器完整代码
        • 3. 读取完缓冲区的客户端完整代码

一、select 1. select接口

IO复用的作用:


select的返回值是fd_set中产生时间的个数

fd_set就是一个长整型的数组,使用每个bit标记一个文件描述符,最大为FD_SETSIZE个位,select能监听描述符的上限。

一般的,我们使用宏来操作fd_set中的bit

#include
FD_ZERO(fd_set* fd_set);             // 清楚fd_set的所有位
FD_SET(int fd, fd_set* fdset)        // 设置fd_set的位fd
FD_CLR(int fd, fd_set* fdset)        // 清除fd_set的位fd
int FD_ISSET(int fd, fd_set* fdset)  // 测试fd_set的位fd是否被设置
2. select完整代码
#include
#include
#include
#include
#include
#include
#include

#define MAX 10

int socket_init(){
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd == -1){
        return -1;
    }
    struct sockaddr_in ser_addr;
    memset(&ser_addr, 0 ,sizeof(ser_addr));
    ser_addr.sin_family = AF_INET;
    ser_addr.sin_port = htons(8888);
    ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    int res = bind(sockfd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
    assert(res != -1);

    res = listen(sockfd, 5);
    assert(res != -1);

    return sockfd;
}

void fds_init(int* fds){
    if(NULL == fds){
        return ;
    }
    for(int i = 0; i < MAX; i++){
        fds[i] = -1;
    }
}

void fds_del(int* fds, int fd){
    if(NULL == fds){
        return ;
    }
    for(int i = 0; i < MAX; i++){   
        if(fds[i] == fd){
            fds[i] = -1;
            break;
        }
    }
}

void fds_add(int* fds, int fd){
    if(NULL == fds){
        return ;
    }
    for(int i = 0; i < MAX; i++){
        if(fds[i] == -1){
            fds[i] = fd;
            break;
        }
    }
}

int main(){
    int sockfd = socket_init();

    int fds[MAX];  // 存放可能有事件发生的描述符,比如 stdin : 0, stdout : 1, stderr : 2
    fds_init(fds);
    fds_add(fds, sockfd);

    fd_set read_fdset;  // 给select检测的位数组
    
    while(1){
        FD_ZERO(&read_fdset);
        int maxfd = -1;   // 记录描述符的最大值,使select只需要检测位数组read_fdset的前maxfd+1位

        // 遍历fds,把可能发生事件的文件描述符放入read_fdset
        for(int i = 0; i < MAX; i++){
            if(fds[i] == -1){
                continue;
            }
            FD_SET(fds[i], &read_fdset);
            maxfd =  fds[i] > maxfd ? fds[i] : maxfd;
        }

        struct timeval tv = {5, 0};
        int n = select(maxfd + 1, &read_fdset, NULL, NULL, &tv);
        if( n == -1 ){
            continue;
        }else if(n == 0){
            printf("select timeout!n");
        }else{
            // 找到发生事件的n个描述符
            for(int i = 0; i < MAX; i++){
                if(fds[i] == -1){
                    continue;
                }
                if(FD_ISSET(fds[i], &read_fdset)){
                    // 服务器可能有监听事件 和 接收数据的事件
                    if(fds[i] == sockfd){
                        struct sockaddr_in cli_addr;
                        int len = sizeof(cli_addr);
                        int conn = accept(sockfd, (struct sockaddr*)&cli_addr, &len);
                        if(conn < 0){
                            printf("客户端连接失败!n");
                            continue;
                        }
                        printf("客户端:%d 连接成功n", conn);
                        fds_add(fds, conn);
                    }else{
                        char buff[128];
                        memset(buff, 0, 128);
                        int num = recv(fds[i], buff, 127, 0);
                        if(num <= 0){
                            printf("客户端:%d 关闭n", fds[i]);
                            close(fds[i]);
                            fds_del(fds, fds[i]);
                            continue;   // 当前描述符关闭,再检测下一个描述符
                        }
                        printf("read:%sn", buff);
                        send(fds[i], buff, num, 0);
                    }
                }
            }
        }
    } 
    return 0;
}
二、Poll 1. poll接口

poll 系统调用和 select 类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者

 int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • poll 系统调用成功返回就绪文件描述符的总数,超时返回 0,失败返回-1
  • nfds 参数指定被监听事件集合 fds 的大小。
  • timeout 参数指定 poll 的超时值,单位是毫秒,timeout 为-1 时,poll 调用将永久阻塞,直到某个事件发生,timeout 为 0 时,poll 调用将立即返回。

fds 参数是一个 struct pollfd 结构类型的数组,它指定所有用户感兴趣的文件描述符上发生的可读、可写和异常等事件。pollfd 结构体定义如下:

struct pollfd{
   int fd; // 文件描述符
   short events; // 注册的关注事件类型
   short revents; // 实际发生的事件类型,由内核填充
};

其中:

  • fd 成员指定文件描述符
  • events 成员告诉 poll 监听 fd 上的哪些事件类型,它是一系列事件的按位或
  • revents 成员则有内核修改,通知应用程序 fd上实际发生了哪些事件
2. poll支持的事件类型

3. poll完整代码
#define _GNU_SOURCE
#include
#include
#include
#include
#include
#include
#include
#include



#define MAX 10

int socket_init(){
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if(listenfd == -1){
        return -1;
    }
    struct sockaddr_in ser_addr;
    memset(&ser_addr, 0 ,sizeof(ser_addr));
    ser_addr.sin_family = AF_INET;
    ser_addr.sin_port = htons(8888);
    ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    int res = bind(listenfd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
    assert(res != -1);

    res = listen(listenfd, 5);
    assert(res != -1);

    return listenfd;
}

// struct pollfd fds[]中存放所有的描述符,根据revents判断是否有事件发生
void fds_init(struct pollfd fds[]){
    if(NULL == fds){
        return ;
    }
    for(int i = 0; i < MAX; i++){
        fds[i].fd = -1;
        fds[i].events = 0;
        fds[i].revents = 0;
    }
}

void fds_add(struct pollfd fds[], int fd){
    if(NULL == fds){
        return ;
    }
    for(int i = 0; i < MAX; i++){
        if( -1 == fds[i].fd ){
            fds[i].fd = fd;
            fds[i].events = POLLIN | POLLRDHUP; // read事件
            fds[i].revents = 0;     // 这由内核填充
            break;
        }
    }
}

void fds_del(struct pollfd fds[], int fd){
    if(NULL == fds){
        return ;
    }
    for(int i = 0; i < MAX; i++){   
        if(fd == fds[i].fd){
            fds[i].fd = -1;
            fds[i].events = 0; 
            fds[i].revents = 0;
            break;
        }
    }
}

int main(){
    int listenfd = socket_init();

    struct pollfd fds[MAX]; // 存放可能发生事件的描述符
    fds_init(fds);
    fds_add(fds, listenfd);
    
    while(1){
        int n = poll(fds, MAX, 5000);  // 等待5s无事件,则下一轮循环,有事件则处理事件
        if(n == -1){
            printf("poll失败!n");
            continue;
        }else if(n == 0){
            printf("poll timeout!n");
            continue;
        }else{
            // 找到发生事件的n个描述符
            for(int i = 0; i < MAX; i++){
                if(fds[i].fd == -1){
                    continue;
                }
                // 一旦客户端关闭,都会收到POLLRDHUP事件
                if(fds[i].revents & POLLRDHUP){
                    fds_del(fds, fds[i].fd);
                    close(fds[i].fd);
                    continue;
                }
                // 由于只是设置了读事件,这里只检查读事件
                if(fds[i].revents & POLLIN){
                    if(listenfd == fds[i].fd){
                        // accept处理
                        struct sockaddr_in cli_addr;
                        int len = sizeof(cli_addr);
                        int conn = accept(listenfd, (struct sockaddr*)&cli_addr, &len);
                        if(conn < 0){
                            printf("客户端连接失败!n");
                            continue;
                        }
                        printf("client %s:%d 连接成功,使用的描述符:%dn", 
                            inet_ntoa(((struct sockaddr_in)cli_addr).sin_addr), 
                            ntohs(((struct sockaddr_in)cli_addr).sin_port), 
                            conn);
                        fds_add(fds, conn);
                    }else{
                        char buff[128] = {0};
                        int num = recv(fds[i].fd, buff, 127, 0);
                        if(num <= 0){
                            printf("客户端:%d 关闭n", fds[i].fd);
                            fds_del(fds, fds[i].fd);
                            close(fds[i].fd);
                            continue;   // 当前描述符关闭,再检测下一个描述符
                        }
                        printf("buff(%d):%sn",num, buff);
                        send(fds[i].fd, buff, num, 0);
                    }
                }
            }
        }
    } 
    return 0;
}
三、epoll 1. epoll接口

epoll 是 Linux 特有的 I/O 复用函数。它在实现和使用上与 select、poll 有很大差异。首先,epoll 使用一组函数来完成任务,而不是单个函数。其次,epoll 把用户关心的文件描述符上的事件放在内核里的一个事件表中。从而无需像select和poll那样每次调用都要重复传入文件描述符或事件集。但 epoll 需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。epoll是一组方法的总称,包括:

  • epoll_create(int size): 用于创建内核事件表,底层为红黑树。成功返回内核事件表的文件描述符,失败返回-1。size 参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大,传入的实参合法即可。
  • epoll_ctl(int epfd, int op, int fd, struct epoll_event *event): 用于操作内核事件表
  • epoll_wait(): 用于在一段超时时间内等待一组文件描述符上的事件
2. epoll完整代码
#define _GNU_SOURCE
#include
#include
#include
#include
#include
#include
#include
#include



#define MAX 10

int socket_init(){
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if(listenfd == -1){
        return -1;
    }
    struct sockaddr_in ser_addr;
    memset(&ser_addr, 0 ,sizeof(ser_addr));
    ser_addr.sin_family = AF_INET;
    ser_addr.sin_port = htons(8888);
    ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    int res = bind(listenfd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
    assert(res != -1);

    res = listen(listenfd, 5);
    assert(res != -1);

    return listenfd;
}

// epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
// epoll_fd:内核事件表的id,把描述符添加到内核事件表中,用于后面检测是否发生事件
void epoll_add(int epoll_fd, int fd){
    struct epoll_event event;  // 将fd封装成结构体后,放入内核事件表
    event.events = EPOLLIN | EPOLLRDHUP;    // 在内核事件表中注册事件
    event.data.fd = fd;        // 在内核事件表中注册描述符
    if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1){
        perror("epoll_ctl add errorn");
    }
}

// 当客户端关闭连接,则把相应描述符从内核事件表中移除
void epoll_del(int epoll_fd, int fd){
    if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1){
        perror("epoll_ctl delete errorn");
    }
}

int main(){
    int listenfd = socket_init();
    
    // 创建内核事件表
    int epoll_fd = epoll_create(MAX);
    assert(epoll_fd != -1);
    
    // 添加描述符到内核事件表
    epoll_add(epoll_fd, listenfd);

    // 一次最多获取MAX个有事件的描述符,若有事件的描述符过多,则分多次获取
    // 存放有事件的描述符
    struct epoll_event events[MAX];
    while(1){
        // int epoll_wait (int __epfd, struct epoll_event *__events, int __maxevents, int __timeout);
        // 从内核事件表epoll_fd中获取有事件的描述符以struct epoll_event的形式放在events
        // epoll_wait的返回值不大于MAX
        int n = epoll_wait(epoll_fd, events, MAX, 5000);
        if(n == -1){
            perror("epoll wait errorn");
        }else if(n == 0){
            perror("epoll timeoutn");
        }else{
            // select和poll需要遍历所有的文件描述符
            // 而epoll不需要遍历所有的,只需要前n个元素即可
            for(int i = 0; i < n; i++){
                int cur_fd = events[i].data.fd;
                // 一旦客户端关闭,都会收到POLLRDHUP事件
                if(events[i].events & POLLRDHUP){
                    printf("client:%d hup closen", cur_fd);
                    epoll_del(epoll_fd, cur_fd);
                    close(cur_fd);
                    continue;
                }
                if(events[i].events & EPOLLIN){
                    if(cur_fd == listenfd){
                        // accept
                        struct sockaddr_in cli_addr;
                        int len = sizeof(cli_addr);
                        int conn = accept(listenfd, (struct sockaddr*)&cli_addr, &len);
                        if(conn < 0){
                            printf("客户端连接失败!n");
                            continue;
                        }
                        printf("client %s:%d 连接成功,使用的描述符:%dn", 
                            inet_ntoa(((struct sockaddr_in)cli_addr).sin_addr), 
                            ntohs(((struct sockaddr_in)cli_addr).sin_port), 
                            conn);
                        // 新的描述符添加到内核事件表
                        epoll_add(epoll_fd, conn);
                    }else{
                        // recv
                        char buff[128] = {0};
                        int num = recv(cur_fd, buff, 127, 0);
                        if(num <= 0){
                            printf("客户端:%d 关闭n", cur_fd);
                            epoll_del(epoll_fd, cur_fd);
                            close(cur_fd);
                            continue;   // 当前描述符关闭,再检测下一个描述符
                        }
                        printf("buff(%d):%sn",num, buff);
                        send(cur_fd, buff, num, 0); 
                    }
                }
            }
        }
    }
    return 0;
}

四、LT和ET模式 1. LT和ET的基本概念

epoll 对文件描述符有两种操作模式:LT(Level Trigger,电平触发)模式和 ET(Edge Trigger,边沿触发)模式。LT 模式是默认的工作模式。当往 epoll 内核事件表中注册一个文件描述符上的EPOLLET 事件时,epoll 将以高效的 ET 模式来操作该文件描述符。

对于 LT 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用 epoll_wait 时,还会再次向应用程序通告此事件,直到该事件被处理。

对于 ET 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的 epoll_wait 调用将不再向应用程序通知这一事件。等事件再次发生,才会再次提醒。 所以 ET 模式在很大程度上降低了同一个 epoll 事件被重复触发的次数,因此效率比 LT 模式高。

epoll默认处于LT模式,也就是内核会不断提醒应用程序,知道程序处理完数据

int num = recv(cur_fd, buff, 1, 0);

2. ET的epoll服务器完整代码
  1. 文件描述符开启 ET模式
  2. recv设置为 非阻塞,缓冲区没数据的时候返回-1,而不是阻塞
  3. 循环读取 缓冲区的数据,直到读完
#define _GNU_SOURCE
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define RD_FIN_STR "服务器数据读取完成n"
#define MAX 10

int socket_init(){
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if(listenfd == -1){
        return -1;
    }
    struct sockaddr_in ser_addr;
    memset(&ser_addr, 0 ,sizeof(ser_addr));
    ser_addr.sin_family = AF_INET;
    ser_addr.sin_port = htons(8888);
    ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    int res = bind(listenfd, (struct sockaddr*)&ser_addr, sizeof(ser_addr));
    assert(res != -1);

    res = listen(listenfd, 5);
    assert(res != -1);

    return listenfd;
}

void set_nonblock(int fd){
    // 获取文件描述符fd的属性
    int old_feature = fcntl(fd, F_GETFL);
    int new_feature = old_feature | O_NONBLOCK;
    // 将新属性设置到fd
    if(fcntl(fd, F_SETFL, new_feature) == -1){
        perror("fcntl errorn");
    }
}

// epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
// epoll_fd:内核事件表的id,把描述符添加到内核事件表中,用于后面检测是否发生事件
void epoll_add(int epoll_fd, int fd){
    struct epoll_event event;  // 将fd封装成结构体后,放入内核事件表
    event.events = EPOLLIN | EPOLLRDHUP | EPOLLET;    // 在内核事件表中注册事件
    event.data.fd = fd;        // 在内核事件表中注册描述符
    set_nonblock(fd);          // 将描述符设置为非阻塞

    if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1){
        perror("epoll_ctl add errorn");
    }
}

// 当客户端关闭连接,则把相应描述符从内核事件表中移除
void epoll_del(int epoll_fd, int fd){
    if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1){
        perror("epoll_ctl delete errorn");
    }
}


int main(){
    int listenfd = socket_init();
    
    // 创建内核事件表
    int epoll_fd = epoll_create(MAX);
    assert(epoll_fd != -1);
    
    // 添加描述符到内核事件表
    epoll_add(epoll_fd, listenfd);

    // 一次最多获取MAX个有事件的描述符,若有事件的描述符过多,则分多次获取
    // 存放有事件的描述符
    struct epoll_event events[MAX];
    while(1){
        // int epoll_wait (int __epfd, struct epoll_event *__events, int __maxevents, int __timeout);
        // 从内核事件表epoll_fd中获取有事件的描述符以struct epoll_event的形式放在events
        // epoll_wait的返回值不大于MAX
        printf("开始epoll_waitn");
        int n = epoll_wait(epoll_fd, events, MAX, 5000);
        if(n == -1){
            perror("epoll wait errorn");
        }else if(n == 0){
            perror("epoll timeoutn");
        }else{
            // select和poll需要遍历所有的文件描述符
            // 而epoll不需要遍历所有的,只需要前n个元素即可
            for(int i = 0; i < n; i++){
                int cur_fd = events[i].data.fd;
                // 一旦客户端关闭,都会收到POLLRDHUP事件
                if(events[i].events & POLLRDHUP){
                    printf("client:%d hup closen", cur_fd);
                    epoll_del(epoll_fd, cur_fd);
                    close(cur_fd);
                    continue;
                }
                if(events[i].events & EPOLLIN){
                    if(cur_fd == listenfd){
                        // accept
                        struct sockaddr_in cli_addr;
                        int len = sizeof(cli_addr);
                        int conn = accept(listenfd, (struct sockaddr*)&cli_addr, &len);
                        if(conn < 0){
                            printf("客户端连接失败!n");
                            continue;
                        }
                        printf("client %s:%d 连接成功,使用的描述符:%dn", 
                            inet_ntoa(((struct sockaddr_in)cli_addr).sin_addr), 
                            ntohs(((struct sockaddr_in)cli_addr).sin_port), 
                            conn);
                        // 新的描述符添加到内核事件表
                        epoll_add(epoll_fd, conn);
                    }else{
                        // recv
                        while(1){
                            char buff[128] = {0};
                            // fd已经被设置为非阻塞,读取不到不会阻塞,而回返回-1
                            int num = recv(cur_fd, buff, 1, 0);
                            if(num == -1){
                                // 不是因为数据读完而出错
                                // errno是errno.h中的一个全局变量,如果读取出错,内核会填写errno
                                if(errno != EAGAIN && errno != EWOULDBLOCK){
                                    perror("recv errorn");
                                }else{
                                    // 数据读完导致num == -1
                                    printf("数据读取完成n");
                                    send(cur_fd, RD_FIN_STR, sizeof(RD_FIN_STR), 0);
                                }
                                break; // 数据读完或出错,退出接收数据
                            }else if(num == 0){
                                printf("客户端:%d 关闭n", cur_fd);
                                epoll_del(epoll_fd, cur_fd);
                                close(cur_fd);
                                break;   // 客户端关闭,退出接收数据
                            }else{
                                printf("buff(%d):%sn",num, buff);
                                send(cur_fd, buff, num, 0); 
                            }
                        }
                    }
                }
            }
        }
    }
    return 0;
}
3. 读取完缓冲区的客户端完整代码
#define _GNU_SOURCE
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

void set_nonblock(int fd){
    // 获取文件描述符fd的属性
    int old_feature = fcntl(fd, F_GETFL);
    int new_feature = old_feature | O_NONBLOCK;
    // 将新属性设置到fd
    if(fcntl(fd, F_SETFL, new_feature) == -1){
        perror("fcntl errorn");
    }
}

int main(){
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd == -1){
        printf("create socket failed!n");
        return 0;
    }
    struct sockaddr_in cli_addr;
    memset(&cli_addr, 0, sizeof(cli_addr));

    cli_addr.sin_family = AF_INET;    // 地址族
    cli_addr.sin_port = htons(8888);  // host to net short
    cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); // inet_addr将字符串转为无符号整型

    // 可以将套接字绑定ip,但一般客户端不绑定,让OS随机分配port
    int res = connect(sockfd, (struct sockaddr*)&cli_addr, sizeof(cli_addr));  // 连接server
    assert(res != -1);
    set_nonblock(sockfd);

    while(1){
        char buff[128] = {0};
        printf("input:");
        fflush(stdout);
        fgets(buff, 128, stdin);
        if(strcmp(buff, "exitn") == 0){
            break;
        }
        send(sockfd, buff, strlen(buff), 0);
        while(1){
            memset(buff, 0 ,128);
            // 客户端send后,给服务器充足的时间读取以及回复
            sleep(1);
            // 从缓冲区尽可能多的取数据,最多取127字节
            int n = recv(sockfd, buff, 127, 0);
            if(n == -1){
                // 不是因为数据读完而出错
                // errno是errno.h中的一个全局变量,如果读取出错,内核会填写errno
                if(errno != EAGAIN && errno != EWOULDBLOCK){
                    perror("recv errorn");
                }
                // else{
                //     printf("服务器读完了,breakn");
                // }
                break; // 数据读完或出错,退出接收数据
            }else if(n == 0){
                printf("服务端:%d 关闭n", sockfd);
                break;   // 客户端关闭,退出接收数据
            }else{
                printf("buff(%d):%sn",n, buff);
            }
        }
    }
    close(sockfd);
    return 0;
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/450815.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号