- 11.1 socket选项SO_RCVTIMEO和SO_SNDTIMEO
- 11.2 SIGALRM信号
- 11.2.1 基于升序链表的定时器
- 11.2.2 处理非活动连接
- 11.3 I/O复用系统调用的超时参数
- 11.4 高性能定时器
- 11.4.1 时间轮
- 11.4.2 时间堆
socket选项的SO_RCVTIMEO和SO_SNDTIMEO分别用来设置socket接收数据超时事件和发送数据的超时时间,仅对send、sendmsg、recv、recvmsg、accpet和connect有效。
| 系统调用 | 有效选项 | 系统调用超时后的行为 |
|---|---|---|
| send | SO_SNDTIMEO | 返回-1,设置errno为EGAIN或EWOULDBLOCK |
| sendmsg | SO_SNDTIMEO | 返回-1,设置errno为EGAIN或EWOULDBLOCK |
| recv | SO_RCVTIMEO | 返回-1,设置errno为EGAIN或EWOULDBLOCK |
| recvmsg | SO_RCVTIMEO | 返回-1,设置errno为EGAIN或EWOULDBLOCK |
| accept | SO_RCVTIMEO | 返回-1,设置errno为EGAIN或EWOULDBLOCK |
| connect | SO_SNDTIMEO | 返回-1,设置errno为EGAIN或EWOULDBLOCK |
实例:设置connect超时时间:
#include#include #include #include #include #include #include #include #include #include #include int timeout_connect(const char* ip, int port, int time){ int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int sockfd = socket(PF_INET, SOCK_STERAM, 0); assert(sockfd>0); // 通过选项设置超时时间类型为timeval struct timeval timeout; timeout.tv_sec = time; timeout.tv_usec = 0; socklen_t len = sizeof(timeout); ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len); assert(ret!=-1); ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address)); if(ret == -1){ // 超时错误号对应的EINPROGRESS if(errno == EINPROGRESS){ printf("connecting timeout, process timeoutn") return -1; } printf("error occur when connectiong to servern"); return -1; } return sockfd; } int main(int argc, char* argv[]){ if(argc<=2){ printf("usage: %s ip_adress port_numbern", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int sockfd = timeout_connect(ip, port, 10); if(sockfd < 0){ return 1; } return 0; }
11.2 SIGALRM信号
由alarm和setitimer函数设置的闹钟一旦超时,将触发SIGALRM信号,可利用该信号的信号处理函数来处理定时任务。一般而言,SIGALRM信号按照固定的频率生成,即由alarm或setitimer函数设置的定时周期T保持不变。如果某个定时任务的超时时间不是T的整数倍,则其被执行的时间和预期的时间有偏差,可通过T反应定时精度。
11.2.1 基于升序链表的定时器定时器通常至少包含两个成员:超时时间和任务回调函数。如果用链表作为容器来串联所有的定时器,则每个定时器还要包含指向下一个定时器的指针成员(单向链表)。
实例:升序定时器链表
#ifndef LST_TIMER #define LST_TIMER #include11.2.2 处理非活动连接#include #define BUFFER_SIZE 64 class util_timer; // 前向声明 // 用户数据结构:客户都安socket地址、socket文件描述符、读缓存和定时器 struct client_data{ sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; util_timer* timer; }; // 定时器类 class util_timer{ public: util_timer() : prev(NULL), next(NULL){}; public: time_t expire; // 任务超时时间,这里使用绝对时间 void (*cb_func)(client_data*); // 任务回调函数 // 回调函数处理的客户数据,由定时器的执行者传递给回调函数 client_data* user_data; util_timer* prev; // 指向前一个定时器 util_timer* next; // 指向下一个定时器 }; // 定时器链表。是一个升序、双向链表,带有头节点和尾节点 class sort_timer_lst{ public: sort_timer_lst() : head(NULL),tail(NULL){}; // 析构函数,销毁时删除所有定时器 ~sort_timer_lst(){ util_timer* tmp = head; while(tmp){ head = tmp->next; delete tmp; tmp = head; } } // 将目标定时器timer添加到链表中 void add_timer(util_timer* timer){ if(!timer){ return; } if(!head){ head = tail = timer; return; } // 如果目标定时器的时间小于当前链表中所有定时器的时间,则将该定时器插入链表头部,作为头节点 // 否则调用重载函数add_timer,将它插入链表合适位置,保证链表的升序特性 if(timer->expire < head->expire){ timer->next = head; head->prev = timer; head = timer; return; } add_timer(timer, head); } // 当某个定时任务发生变化时,调整对应的定时器在链表中的位置 // 只考虑被调整的定时器的超时时间延长的情况,即该定时器需要向链表尾部移动 void adjust_timer(util_timer* timer){ if(!timer){ return; } util_timer* tmp = timer->next; // 如果被调整的目标定时器在链表尾部,或者该定时新的超时值仍然小于下一个,则不用调整 if(!tmp || (timer->expire < tmp->expire)){ return; } // 人目标定时器是链表的头节点,则将该定时器从链表中取出并重新插入链表 if(timer==head){ head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } // 如果目标定时器不是链表的头节点,则将该定时器从链表中取出,然后重新插入 else{ timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); } } // 从链表中删除目标定时器 void del_timer(util_timer* timer){ if(!timer){ return; } // 若链表中只有一个定时器 if((timer==head)&&(timer==tail)){ delete timer; head = NULL; tail = NULL; return; } // 如果链表中至少有两个定时器。且目标定时器是链表头节点。 if(timer==head){ head = head->next; head->prev = NULL; delete timer; return; } // 如果定时器是尾节点 if(timer == tail){ tail = timer->prev; tail->next = NULL; delete timer; return; } // 如果定时器在链表中间 timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; } // SIGALRM信号被触发一次及阻碍信号处理函数中执行一次tick函数,以处理链表上到期的任务 // 相当于心搏函数,每隔一段时间就执行一次,检测并处理到期的任务 void tick(){ if(!head){ return; } printf("timer tickn"); time_t cur = time(NULL); // 获得系统当前时间 util_timer* tmp = head; // 从头节点开始依次处理每个定时器,直到遇到尚未到期的定时器 while(tmp){ // 因为每个定时器都是用绝对事件作为超时值 // 将定时器超时值和系统当前时间做对比 if(cur < tmp->expire){ break; } // 调用定时器回调函数,以执行定时任务 tmp->cb_func(tmp->user_data); head = tmp->next; if(head){ head->prev = NULL; } delete tmp; tmp = head; } } private: // 重载add_timer,将目标定时器添加到节点lst_head后面的链表部分 void add_timer(util_timer* timer, util_timer* lst_head){ util_timer* prev = lst_head; util_timer* tmp = prev->next; // 遍历lst_head后面的链表,将目标定时器插入 while(tmp){ if(timer->expire expire){ prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } // 如果遍历完tmp,则需要将目标定时器插入到尾部 if(!tmp){ prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; } } private: util_timer* head; util_timer* tail; }; #endif
服务器程序通常要定期处理非活动连接:给客户都安发送一个重连请求,或者关闭该连接,或者其他。Linux在内核中提供了对连接是否处于活动状态的定期检查机制,可通过socketKEEPALIVE进行激活,但此方法使得应用程序对连接的管理变得复杂。
可考虑在应用层实现类似KEEPALIVE的机制,管理长时间处于非活动状态的链接。可利用alarm函数定期触发SIGALRM信号,该信号的信号处理函数利用管道通道通知主循环执行定时器链表上的定时任务————关闭非活动的连接。
实例:关闭非活动连接
#include#include #include #include #include #include #include #include #include #include #include #include #include #include "升序定时器链表.h" #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2]; // 利用升序链表管理定时器 static sort_timer_lst timer_lst; static int epollfd = 0; // 将文件描述符设置成非阻塞 int setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } // 将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中,参数enable_et指定是否对fd启用ET模式 void addfd(int epollfd, int fd, bool enable_et){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN; if(enable_et){ event.events |= EPOLLET; } epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void sig_handler(int sig){ int save_errno = errno; int msg = sig; send(pipefd[1], (char*)&msg, 1, 0); errno = save_errno; } // 设置信号的处理函数 void addsig(int sig){ struct sigaction sa; memset(&sa, ' ', sizeof(sa)); sa.sa_handler = sig_handler; sa.sa_flags != SA_RESTART; sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL)!=-1); } void timer_handler(){ // 定时处理任务,实际是调用tick函数 timer_lst.tick(); // alarm调用只会引起一次SIGALRM信号,需要重新定时,以不断触发SIGALRM信号 alarm(TIMESLOT); } // 定时器回调函数,剔除非活动连接socket的注册时间,并关闭 void cb_func(client_data* user_data){ epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); assert(user_data); close(user_data->sockfd); printf("close fd %dn", user_data->sockfd); } int main(int argc, char* argv[]){ if(argc<=2){ printf("usage: %s ip_address port_number n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STERAM, 0); assert(sock>0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); if(ret = -1){ printf("errno is %dn", errno); return 1; } ret = listen(listenfd, 5); assert(ret!=-1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd!=-1); addfd(epollfd, listenfd); ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret != -1); setnonblocking(pipefd[1]); addfd(epollfd, pipefd[0]); // 设置信号处理函数 addsig(SIGALRM); addsig(SIGTERM); bool stop_server = false; client_data* users = new client_data[FD_LIMIT]; bool timeout = false; alarm(TIMESLOT); // 定时 while(!stop_server){ int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if((number<0)&&(errno!=EINTR)){ printf("epoll failuren"); break; } for(int i=0;i user_data = &users[connfd]; timer->cb_func = cb_func; time_t cur = timer(NULL); timer->expire = cur + 3*TIMESLOT; users[connfd].timer = timer; timer_lst.add_timer(timer); } // 处理信号 else if((sockfd==pipefd[0])&&(events[i].events&EPOLLIN)){ int sig; char signals[1024]; ret = recv(pipefd[0],signals,sizeof(signals),0); if(ret == -1){ // handle the error continue; } else if(ret == 0){ continue; } else{ for(int i=0;i


