- 一、select
- 1. select接口
- 2. select完整代码
- 二、Poll
- 1. poll接口
- 2. poll支持的事件类型
- 3. poll完整代码
- 三、epoll
- 1. epoll接口
- 2. epoll完整代码
- 四、LT和ET模式
- 1. LT和ET的基本概念
- 2. ET的epoll服务器完整代码
- 3. 读取完缓冲区的客户端完整代码
IO复用的作用:
select的返回值是fd_set中产生时间的个数
fd_set就是一个长整型的数组,使用每个bit标记一个文件描述符,最大为FD_SETSIZE个位,select能监听描述符的上限。
一般的,我们使用宏来操作fd_set中的bit
#include2. select完整代码FD_ZERO(fd_set* fd_set); // 清楚fd_set的所有位 FD_SET(int fd, fd_set* fdset) // 设置fd_set的位fd FD_CLR(int fd, fd_set* fdset) // 清除fd_set的位fd int FD_ISSET(int fd, fd_set* fdset) // 测试fd_set的位fd是否被设置
#include二、Poll 1. poll接口#include #include #include #include #include #include #define MAX 10 int socket_init(){ int sockfd = socket(AF_INET, SOCK_STREAM, 0); if(sockfd == -1){ return -1; } struct sockaddr_in ser_addr; memset(&ser_addr, 0 ,sizeof(ser_addr)); ser_addr.sin_family = AF_INET; ser_addr.sin_port = htons(8888); ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); int res = bind(sockfd, (struct sockaddr*)&ser_addr, sizeof(ser_addr)); assert(res != -1); res = listen(sockfd, 5); assert(res != -1); return sockfd; } void fds_init(int* fds){ if(NULL == fds){ return ; } for(int i = 0; i < MAX; i++){ fds[i] = -1; } } void fds_del(int* fds, int fd){ if(NULL == fds){ return ; } for(int i = 0; i < MAX; i++){ if(fds[i] == fd){ fds[i] = -1; break; } } } void fds_add(int* fds, int fd){ if(NULL == fds){ return ; } for(int i = 0; i < MAX; i++){ if(fds[i] == -1){ fds[i] = fd; break; } } } int main(){ int sockfd = socket_init(); int fds[MAX]; // 存放可能有事件发生的描述符,比如 stdin : 0, stdout : 1, stderr : 2 fds_init(fds); fds_add(fds, sockfd); fd_set read_fdset; // 给select检测的位数组 while(1){ FD_ZERO(&read_fdset); int maxfd = -1; // 记录描述符的最大值,使select只需要检测位数组read_fdset的前maxfd+1位 // 遍历fds,把可能发生事件的文件描述符放入read_fdset for(int i = 0; i < MAX; i++){ if(fds[i] == -1){ continue; } FD_SET(fds[i], &read_fdset); maxfd = fds[i] > maxfd ? fds[i] : maxfd; } struct timeval tv = {5, 0}; int n = select(maxfd + 1, &read_fdset, NULL, NULL, &tv); if( n == -1 ){ continue; }else if(n == 0){ printf("select timeout!n"); }else{ // 找到发生事件的n个描述符 for(int i = 0; i < MAX; i++){ if(fds[i] == -1){ continue; } if(FD_ISSET(fds[i], &read_fdset)){ // 服务器可能有监听事件 和 接收数据的事件 if(fds[i] == sockfd){ struct sockaddr_in cli_addr; int len = sizeof(cli_addr); int conn = accept(sockfd, (struct sockaddr*)&cli_addr, &len); if(conn < 0){ printf("客户端连接失败!n"); continue; } printf("客户端:%d 连接成功n", conn); fds_add(fds, conn); }else{ char buff[128]; memset(buff, 0, 128); int num = recv(fds[i], buff, 127, 0); if(num <= 0){ printf("客户端:%d 关闭n", fds[i]); close(fds[i]); fds_del(fds, fds[i]); continue; // 当前描述符关闭,再检测下一个描述符 } printf("read:%sn", buff); send(fds[i], buff, num, 0); } } } } } return 0; }
poll 系统调用和 select 类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- poll 系统调用成功返回就绪文件描述符的总数,超时返回 0,失败返回-1
- nfds 参数指定被监听事件集合 fds 的大小。
- timeout 参数指定 poll 的超时值,单位是毫秒,timeout 为-1 时,poll 调用将永久阻塞,直到某个事件发生,timeout 为 0 时,poll 调用将立即返回。
fds 参数是一个 struct pollfd 结构类型的数组,它指定所有用户感兴趣的文件描述符上发生的可读、可写和异常等事件。pollfd 结构体定义如下:
struct pollfd{
int fd; // 文件描述符
short events; // 注册的关注事件类型
short revents; // 实际发生的事件类型,由内核填充
};
其中:
- fd 成员指定文件描述符
- events 成员告诉 poll 监听 fd 上的哪些事件类型,它是一系列事件的按位或
- revents 成员则有内核修改,通知应用程序 fd上实际发生了哪些事件
#define _GNU_SOURCE #include三、epoll 1. epoll接口#include #include #include #include #include #include #include #define MAX 10 int socket_init(){ int listenfd = socket(AF_INET, SOCK_STREAM, 0); if(listenfd == -1){ return -1; } struct sockaddr_in ser_addr; memset(&ser_addr, 0 ,sizeof(ser_addr)); ser_addr.sin_family = AF_INET; ser_addr.sin_port = htons(8888); ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); int res = bind(listenfd, (struct sockaddr*)&ser_addr, sizeof(ser_addr)); assert(res != -1); res = listen(listenfd, 5); assert(res != -1); return listenfd; } // struct pollfd fds[]中存放所有的描述符,根据revents判断是否有事件发生 void fds_init(struct pollfd fds[]){ if(NULL == fds){ return ; } for(int i = 0; i < MAX; i++){ fds[i].fd = -1; fds[i].events = 0; fds[i].revents = 0; } } void fds_add(struct pollfd fds[], int fd){ if(NULL == fds){ return ; } for(int i = 0; i < MAX; i++){ if( -1 == fds[i].fd ){ fds[i].fd = fd; fds[i].events = POLLIN | POLLRDHUP; // read事件 fds[i].revents = 0; // 这由内核填充 break; } } } void fds_del(struct pollfd fds[], int fd){ if(NULL == fds){ return ; } for(int i = 0; i < MAX; i++){ if(fd == fds[i].fd){ fds[i].fd = -1; fds[i].events = 0; fds[i].revents = 0; break; } } } int main(){ int listenfd = socket_init(); struct pollfd fds[MAX]; // 存放可能发生事件的描述符 fds_init(fds); fds_add(fds, listenfd); while(1){ int n = poll(fds, MAX, 5000); // 等待5s无事件,则下一轮循环,有事件则处理事件 if(n == -1){ printf("poll失败!n"); continue; }else if(n == 0){ printf("poll timeout!n"); continue; }else{ // 找到发生事件的n个描述符 for(int i = 0; i < MAX; i++){ if(fds[i].fd == -1){ continue; } // 一旦客户端关闭,都会收到POLLRDHUP事件 if(fds[i].revents & POLLRDHUP){ fds_del(fds, fds[i].fd); close(fds[i].fd); continue; } // 由于只是设置了读事件,这里只检查读事件 if(fds[i].revents & POLLIN){ if(listenfd == fds[i].fd){ // accept处理 struct sockaddr_in cli_addr; int len = sizeof(cli_addr); int conn = accept(listenfd, (struct sockaddr*)&cli_addr, &len); if(conn < 0){ printf("客户端连接失败!n"); continue; } printf("client %s:%d 连接成功,使用的描述符:%dn", inet_ntoa(((struct sockaddr_in)cli_addr).sin_addr), ntohs(((struct sockaddr_in)cli_addr).sin_port), conn); fds_add(fds, conn); }else{ char buff[128] = {0}; int num = recv(fds[i].fd, buff, 127, 0); if(num <= 0){ printf("客户端:%d 关闭n", fds[i].fd); fds_del(fds, fds[i].fd); close(fds[i].fd); continue; // 当前描述符关闭,再检测下一个描述符 } printf("buff(%d):%sn",num, buff); send(fds[i].fd, buff, num, 0); } } } } } return 0; }
epoll 是 Linux 特有的 I/O 复用函数。它在实现和使用上与 select、poll 有很大差异。首先,epoll 使用一组函数来完成任务,而不是单个函数。其次,epoll 把用户关心的文件描述符上的事件放在内核里的一个事件表中。从而无需像select和poll那样每次调用都要重复传入文件描述符或事件集。但 epoll 需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。epoll是一组方法的总称,包括:
- epoll_create(int size): 用于创建内核事件表,底层为红黑树。成功返回内核事件表的文件描述符,失败返回-1。size 参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大,传入的实参合法即可。
- epoll_ctl(int epfd, int op, int fd, struct epoll_event *event): 用于操作内核事件表
- epoll_wait(): 用于在一段超时时间内等待一组文件描述符上的事件
#define _GNU_SOURCE #include四、LT和ET模式 1. LT和ET的基本概念#include #include #include #include #include #include #include #define MAX 10 int socket_init(){ int listenfd = socket(AF_INET, SOCK_STREAM, 0); if(listenfd == -1){ return -1; } struct sockaddr_in ser_addr; memset(&ser_addr, 0 ,sizeof(ser_addr)); ser_addr.sin_family = AF_INET; ser_addr.sin_port = htons(8888); ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); int res = bind(listenfd, (struct sockaddr*)&ser_addr, sizeof(ser_addr)); assert(res != -1); res = listen(listenfd, 5); assert(res != -1); return listenfd; } // epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) // epoll_fd:内核事件表的id,把描述符添加到内核事件表中,用于后面检测是否发生事件 void epoll_add(int epoll_fd, int fd){ struct epoll_event event; // 将fd封装成结构体后,放入内核事件表 event.events = EPOLLIN | EPOLLRDHUP; // 在内核事件表中注册事件 event.data.fd = fd; // 在内核事件表中注册描述符 if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1){ perror("epoll_ctl add errorn"); } } // 当客户端关闭连接,则把相应描述符从内核事件表中移除 void epoll_del(int epoll_fd, int fd){ if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1){ perror("epoll_ctl delete errorn"); } } int main(){ int listenfd = socket_init(); // 创建内核事件表 int epoll_fd = epoll_create(MAX); assert(epoll_fd != -1); // 添加描述符到内核事件表 epoll_add(epoll_fd, listenfd); // 一次最多获取MAX个有事件的描述符,若有事件的描述符过多,则分多次获取 // 存放有事件的描述符 struct epoll_event events[MAX]; while(1){ // int epoll_wait (int __epfd, struct epoll_event *__events, int __maxevents, int __timeout); // 从内核事件表epoll_fd中获取有事件的描述符以struct epoll_event的形式放在events // epoll_wait的返回值不大于MAX int n = epoll_wait(epoll_fd, events, MAX, 5000); if(n == -1){ perror("epoll wait errorn"); }else if(n == 0){ perror("epoll timeoutn"); }else{ // select和poll需要遍历所有的文件描述符 // 而epoll不需要遍历所有的,只需要前n个元素即可 for(int i = 0; i < n; i++){ int cur_fd = events[i].data.fd; // 一旦客户端关闭,都会收到POLLRDHUP事件 if(events[i].events & POLLRDHUP){ printf("client:%d hup closen", cur_fd); epoll_del(epoll_fd, cur_fd); close(cur_fd); continue; } if(events[i].events & EPOLLIN){ if(cur_fd == listenfd){ // accept struct sockaddr_in cli_addr; int len = sizeof(cli_addr); int conn = accept(listenfd, (struct sockaddr*)&cli_addr, &len); if(conn < 0){ printf("客户端连接失败!n"); continue; } printf("client %s:%d 连接成功,使用的描述符:%dn", inet_ntoa(((struct sockaddr_in)cli_addr).sin_addr), ntohs(((struct sockaddr_in)cli_addr).sin_port), conn); // 新的描述符添加到内核事件表 epoll_add(epoll_fd, conn); }else{ // recv char buff[128] = {0}; int num = recv(cur_fd, buff, 127, 0); if(num <= 0){ printf("客户端:%d 关闭n", cur_fd); epoll_del(epoll_fd, cur_fd); close(cur_fd); continue; // 当前描述符关闭,再检测下一个描述符 } printf("buff(%d):%sn",num, buff); send(cur_fd, buff, num, 0); } } } } } return 0; }
epoll 对文件描述符有两种操作模式:LT(Level Trigger,电平触发)模式和 ET(Edge Trigger,边沿触发)模式。LT 模式是默认的工作模式。当往 epoll 内核事件表中注册一个文件描述符上的EPOLLET 事件时,epoll 将以高效的 ET 模式来操作该文件描述符。
对于 LT 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用 epoll_wait 时,还会再次向应用程序通告此事件,直到该事件被处理。
对于 ET 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的 epoll_wait 调用将不再向应用程序通知这一事件。等事件再次发生,才会再次提醒。 所以 ET 模式在很大程度上降低了同一个 epoll 事件被重复触发的次数,因此效率比 LT 模式高。
epoll默认处于LT模式,也就是内核会不断提醒应用程序,知道程序处理完数据
int num = recv(cur_fd, buff, 1, 0);2. ET的epoll服务器完整代码
- 文件描述符开启 ET模式
- recv设置为 非阻塞,缓冲区没数据的时候返回-1,而不是阻塞
- 循环读取 缓冲区的数据,直到读完
#define _GNU_SOURCE #include3. 读取完缓冲区的客户端完整代码#include #include #include #include #include #include #include #include #include #define RD_FIN_STR "服务器数据读取完成n" #define MAX 10 int socket_init(){ int listenfd = socket(AF_INET, SOCK_STREAM, 0); if(listenfd == -1){ return -1; } struct sockaddr_in ser_addr; memset(&ser_addr, 0 ,sizeof(ser_addr)); ser_addr.sin_family = AF_INET; ser_addr.sin_port = htons(8888); ser_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); int res = bind(listenfd, (struct sockaddr*)&ser_addr, sizeof(ser_addr)); assert(res != -1); res = listen(listenfd, 5); assert(res != -1); return listenfd; } void set_nonblock(int fd){ // 获取文件描述符fd的属性 int old_feature = fcntl(fd, F_GETFL); int new_feature = old_feature | O_NONBLOCK; // 将新属性设置到fd if(fcntl(fd, F_SETFL, new_feature) == -1){ perror("fcntl errorn"); } } // epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) // epoll_fd:内核事件表的id,把描述符添加到内核事件表中,用于后面检测是否发生事件 void epoll_add(int epoll_fd, int fd){ struct epoll_event event; // 将fd封装成结构体后,放入内核事件表 event.events = EPOLLIN | EPOLLRDHUP | EPOLLET; // 在内核事件表中注册事件 event.data.fd = fd; // 在内核事件表中注册描述符 set_nonblock(fd); // 将描述符设置为非阻塞 if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1){ perror("epoll_ctl add errorn"); } } // 当客户端关闭连接,则把相应描述符从内核事件表中移除 void epoll_del(int epoll_fd, int fd){ if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1){ perror("epoll_ctl delete errorn"); } } int main(){ int listenfd = socket_init(); // 创建内核事件表 int epoll_fd = epoll_create(MAX); assert(epoll_fd != -1); // 添加描述符到内核事件表 epoll_add(epoll_fd, listenfd); // 一次最多获取MAX个有事件的描述符,若有事件的描述符过多,则分多次获取 // 存放有事件的描述符 struct epoll_event events[MAX]; while(1){ // int epoll_wait (int __epfd, struct epoll_event *__events, int __maxevents, int __timeout); // 从内核事件表epoll_fd中获取有事件的描述符以struct epoll_event的形式放在events // epoll_wait的返回值不大于MAX printf("开始epoll_waitn"); int n = epoll_wait(epoll_fd, events, MAX, 5000); if(n == -1){ perror("epoll wait errorn"); }else if(n == 0){ perror("epoll timeoutn"); }else{ // select和poll需要遍历所有的文件描述符 // 而epoll不需要遍历所有的,只需要前n个元素即可 for(int i = 0; i < n; i++){ int cur_fd = events[i].data.fd; // 一旦客户端关闭,都会收到POLLRDHUP事件 if(events[i].events & POLLRDHUP){ printf("client:%d hup closen", cur_fd); epoll_del(epoll_fd, cur_fd); close(cur_fd); continue; } if(events[i].events & EPOLLIN){ if(cur_fd == listenfd){ // accept struct sockaddr_in cli_addr; int len = sizeof(cli_addr); int conn = accept(listenfd, (struct sockaddr*)&cli_addr, &len); if(conn < 0){ printf("客户端连接失败!n"); continue; } printf("client %s:%d 连接成功,使用的描述符:%dn", inet_ntoa(((struct sockaddr_in)cli_addr).sin_addr), ntohs(((struct sockaddr_in)cli_addr).sin_port), conn); // 新的描述符添加到内核事件表 epoll_add(epoll_fd, conn); }else{ // recv while(1){ char buff[128] = {0}; // fd已经被设置为非阻塞,读取不到不会阻塞,而回返回-1 int num = recv(cur_fd, buff, 1, 0); if(num == -1){ // 不是因为数据读完而出错 // errno是errno.h中的一个全局变量,如果读取出错,内核会填写errno if(errno != EAGAIN && errno != EWOULDBLOCK){ perror("recv errorn"); }else{ // 数据读完导致num == -1 printf("数据读取完成n"); send(cur_fd, RD_FIN_STR, sizeof(RD_FIN_STR), 0); } break; // 数据读完或出错,退出接收数据 }else if(num == 0){ printf("客户端:%d 关闭n", cur_fd); epoll_del(epoll_fd, cur_fd); close(cur_fd); break; // 客户端关闭,退出接收数据 }else{ printf("buff(%d):%sn",num, buff); send(cur_fd, buff, num, 0); } } } } } } } return 0; }
#define _GNU_SOURCE #include#include #include #include #include #include #include #include #include #include void set_nonblock(int fd){ // 获取文件描述符fd的属性 int old_feature = fcntl(fd, F_GETFL); int new_feature = old_feature | O_NONBLOCK; // 将新属性设置到fd if(fcntl(fd, F_SETFL, new_feature) == -1){ perror("fcntl errorn"); } } int main(){ int sockfd = socket(AF_INET, SOCK_STREAM, 0); if(sockfd == -1){ printf("create socket failed!n"); return 0; } struct sockaddr_in cli_addr; memset(&cli_addr, 0, sizeof(cli_addr)); cli_addr.sin_family = AF_INET; // 地址族 cli_addr.sin_port = htons(8888); // host to net short cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); // inet_addr将字符串转为无符号整型 // 可以将套接字绑定ip,但一般客户端不绑定,让OS随机分配port int res = connect(sockfd, (struct sockaddr*)&cli_addr, sizeof(cli_addr)); // 连接server assert(res != -1); set_nonblock(sockfd); while(1){ char buff[128] = {0}; printf("input:"); fflush(stdout); fgets(buff, 128, stdin); if(strcmp(buff, "exitn") == 0){ break; } send(sockfd, buff, strlen(buff), 0); while(1){ memset(buff, 0 ,128); // 客户端send后,给服务器充足的时间读取以及回复 sleep(1); // 从缓冲区尽可能多的取数据,最多取127字节 int n = recv(sockfd, buff, 127, 0); if(n == -1){ // 不是因为数据读完而出错 // errno是errno.h中的一个全局变量,如果读取出错,内核会填写errno if(errno != EAGAIN && errno != EWOULDBLOCK){ perror("recv errorn"); } // else{ // printf("服务器读完了,breakn"); // } break; // 数据读完或出错,退出接收数据 }else if(n == 0){ printf("服务端:%d 关闭n", sockfd); break; // 客户端关闭,退出接收数据 }else{ printf("buff(%d):%sn",n, buff); } } } close(sockfd); return 0; }



