1、I/O复用系统调用的超时参数 在网络程序中我们通常要处理三种事件,网络I/O事件、信号以及定时事件,我们可以使用I/O复用系统调用(select、poll、epoll)将这三类事件进行统一处理。我们通常使用定时器来检测一个客户端的活动状态,服务器程序通常管理着众多定时事件,因此有效地组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响。为此我们需要将每个定时事件分别封装为定时器,并使用某种容器类数据结构,比如:链表、排序链表、最小堆、红黑树以及时间轮等,将所有定时器串联起来,以实现对定时事件的统一管理。此处所说的定时器,确切的说应该是定时容器,定时器容器是容器类数据结构;定时器则是容器内容纳的一个个对象,它是对定时事件的封装,定时容器是用来管理定时器的。
在本文中将主要介绍使用最小堆来实现的定时容器。
Linux下的3组I/O复用系统调用(select、poll、epoll)都带有定时参数,因此他们不仅能统一处理信号和I/O事件,也能统一处理定时事件。我们可以使用定时容器和I/O复用系统调用来共同实现定时器的触发。
这三个系统调用的定义如下:
#includeint select(int fds, fd_set *readfds, fd_set *writerfds, fd_set *exceptfds, struct timeval *timeout); struct timeval { long tv_sec; //秒数 long tv_usec; //微妙数 } #include int poll(struct pollfd *fds, nfds_t nfds, int timeout); #include int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
这三个系统调用都有一个timeout的参数,当发生I/O事件时,这三个系统调用将会返回;当指定的时间到达时,如果没有I/O事件发生,这三个系统调用也会返回。
2、时间堆定时容器 该定时器容器的思路是:将所有定时器中超时时间最小的一个定时器的超时值作为心搏间隔,这样,一旦心搏函数tick被调用,超时时间最小的定时器必然到期,我们就可以在tick函数中处理该定时器。然后,再从剩余的定时器中找到超时时间最小的一个,并将这段最小时间设置为下一次心搏间隔,如此反复,就实现了较为精确的定时。
最小堆很适合处理这种定时方案。最小堆是指每个节点的值都小于或等于其子节点的值的完全二叉树。
树的基本操作是插入和删除节点。对最小堆而言,它们都很简单。为了将一个元素X插入最小堆,我们可以在树的下一个空闲位置创建一个空穴,如果X可以放在空穴中而不破坏堆序,则插入完成。否则就执行上滤操作,即交换空穴和它的父节点上的元素,不断执行上述过程,直至X可以放入空穴,则插入操作完成。可以按照下图的步骤来操作,假设要在最小堆中插入一个元素14:
最小堆的删除操作指的是删除其根节点上的元素,并且不破坏堆序性质。执行删除顶部元素操作时,我们需要先在根节点处创建一个空穴,由于堆现在少了一个元素,因此我们可以将堆的最后一个元素X移动到该堆的某个地方。如果X可以被放入空穴,则删除操作完成。否则就执行下滤操作,即交互空穴和它的两个儿子中的较小者。不断进行上述过程,直至X可以被放入空穴,则删除操作完成。比如我们对上图插入元素前的最小堆执行删除顶部元素操作:
由于最小堆是一种完全二叉树,所以我们可以使用数组来组织其中的元素。对于数组中的任意一个位置i上的元素,其左儿子节点在位置2i+1上,右儿子在位置2i+2上,其父节点在位置[(i - 1) / 2](i>0)上。与用链表来表示堆相比,用数组不仅节省空间,而且更容易实现堆的插入、删除等操作。
最小堆实现的定时容器的代码实现如下:
在timer_common.hpp中定义了Timer以及ITimerContainer两个类,Timer类为定时器类,ITimerContiner类为定时器容器的一个虚基类或者说是接口,后续将实现最小堆定时器、时间轮定时器以及红黑树定时器,这几个定时器都实现了ITimerContainer中的方法。
最小堆定时容器的几个接口介绍:
1) tick :在tick函数中循环查找超时值最小的定时器,并调用其回调函数,直到找到的定时器的没有超时,就结束循环。
2)addTimer::向容器中添加一个定时器,并返回定时器的指针。
3)delTimer::根据传入的定时器指针删除容器中的一个定时器,并且销毁资源。
4)resetTimer: 重置一个定时器。
5)top:获取容器中超时值最小的定时器;
6)popTimer:删除容器中超时值最小的定时器,并销毁资源。
timer_common.hpp:
#ifndef _LIB_SRC_TIMER_COMMON_H #define _LIB_SRC_TIMER_COMMON_H #include#include // 获取时间戳 单位:毫秒 time_t getMSec() { struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec * 1000 + tv.tv_usec / 1000; } // 定时器数据结构的定义 template class Timer { public: Timer() = default; Timer(int msec) { this->_expire = getMSec() + msec; } ~Timer() { } void setTimeout(time_t timeout) { this->_expire = getMSec() + timeout; } time_t getExpire() { return _expire; } void setUserData(_User_Data *userData) { this->_user_data = userData; } void handleTimeOut() { if(_cb_func) { _cb_func(_user_data); } } using TimeOutCbFunc = void (*)(_User_Data *); void setCallBack(TimeOutCbFunc callBack) { this->_cb_func = callBack; } private: time_t _expire; // 定时器生效的绝对时间 _User_Data *_user_data; // 用户数据 TimeOutCbFunc _cb_func; // 超时时的回调函数 }; // 虚基类 ITimerContainer template class ITimerContainer { public: ITimerContainer() = default; virtual ~ITimerContainer() = default; public: virtual void tick() = 0; virtual Timer<_UData> *addTimer(time_t timeout) = 0; virtual void delTimer(Timer<_UData> *timer) = 0; virtual void resetTimer(Timer<_UData> *timer, time_t timeout) = 0; virtual Timer<_UData> *top() = 0; virtual void popTimer() = 0; }; #endif
heap_timer.hpp:
#ifndef _LIB_SRC_HEAP_TIMER_H_ #define _LIB_SRC_HEAP_TIMER_H_ #include#include "timer_common.hpp" #define HEAP_DEFAULT_SIZE 128 // 定时器数据结构的定义 template class HeapTimer { public: HeapTimer() = default; HeapTimer(int msec) { timer.setTimeout(msec); } ~HeapTimer() { } void setTimeout(time_t timeout) { timer.setTimeout(timeout); } time_t getExpire() { return timer.getExpire(); } void setUserData(_User_Data *userData) { timer.setUserData(userData); } int getPos() { return _pos; } void setPos(int pos) { this->_pos = pos; } void handleTimeOut() { timer.handleTimeOut(); } using TimeOutCbFunc = void (*)(_User_Data *); void setCallBack(TimeOutCbFunc callBack) { timer.setCallBack(callBack); } public: Timer<_User_Data> timer; private: int _pos; // 保存该定时器在数组中的位置,以便查找删除操作 }; // 定时容器,使用最小堆实现 template class HeapTimerContainer : public ITimerContainer<_UData> { public: HeapTimerContainer(); HeapTimerContainer(int capacity); HeapTimerContainer(HeapTimer<_UData> **initArray, int arrSize, int capacity); virtual ~HeapTimerContainer() override; public: virtual void tick() override; Timer<_UData> *addTimer(time_t timeout) override; void delTimer(Timer<_UData> *timer) override; void resetTimer(Timer<_UData> *timer, time_t timeout) override; Timer<_UData> *top() override; void popTimer() override; private: void percolateDown(int hole); void percolateUp(int hole); void resize(); bool isEmpty(); private: HeapTimer<_UData> **_array; // 堆数据 int _capacity; // 堆数组的容量 int _size; // 当前包含的元素 }; template HeapTimerContainer<_UData>::HeapTimerContainer() : HeapTimerContainer(HEAP_DEFAULT_SIZE) { } // 初始化一个大小为cap的空堆 template HeapTimerContainer<_UData>::HeapTimerContainer(int capacity) { this->_capacity = capacity; this->_size = 0; _array = new HeapTimer<_UData> *[capacity]{nullptr}; } // 用已有数组来初始化堆 template HeapTimerContainer<_UData>::HeapTimerContainer(HeapTimer<_UData> **initArray, int arrSize, int capacity) : _size(arrSize) { if(capacity < arrSize) { this->_capacity = capacity = 2 * arrSize; } _array = new HeapTimer<_UData> *[capacity]; for (int i = 0; i < capacity; i++) { _array[i] = nullptr; } if(arrSize > 0) { for (int i = 0; i < arrSize; i++) { _array[i] = initArray[i]; } for(int i = (_size - 1) / 2; i >= 0; i--) { percolateDown(i); //对数组中的第(_size - 1) / 2 ~ 0个元素执行下滤操作 } } } template HeapTimerContainer<_UData>::~HeapTimerContainer() { if(_array) { for(int i = 0; i < _size; i++) { delete _array[i]; } delete []_array; } } template void HeapTimerContainer<_UData>::tick() { std::cout << "----------tick----------" << std::endl; HeapTimer<_UData> *tmp = _array[0]; time_t cur = getMSec(); // 循环处理到期的定时器 while(!isEmpty()) { if(!tmp) { break; } // 如果定时器没到期,则退出循环 if(tmp->getExpire() > cur) { break; } tmp->handleTimeOut(); // 将堆顶元素删除,同时生成新的堆顶定时器 popTimer(); tmp = _array[0]; } } // 获取一个定时器 template Timer<_UData> *HeapTimerContainer<_UData>::addTimer(time_t timeout) { if(_size >= _capacity) { this->resize(); //如果容量不够,则进行扩容 } // hole是新建空穴的位置 int hole = _size++; HeapTimer<_UData> *timer = new HeapTimer<_UData>(timeout); _array[hole] = timer; percolateUp(hole); return &timer->timer; } // 删除目标定时器 template void HeapTimerContainer<_UData>::delTimer(Timer<_UData> *timer) { if(!timer) { return ; } timer->setCallBack(nullptr); timer->setUserData(nullptr); } // 重置一个定时器 template void HeapTimerContainer<_UData>::resetTimer(Timer<_UData> *timer, time_t timeout) { // 类型强转 HeapTimer<_UData> *htimer = reinterpret_cast< HeapTimer<_UData>* >(timer); // 找到该定时器在数组中的位置,将其与最后一个定时器的位置交换,然后先进行下滤操作,再进行上滤操作 int pos = htimer->getPos(); int lastPos = _size - 1; if(pos != lastPos) { HeapTimer<_UData> *temp = _array[pos]; _array[pos] = _array[lastPos]; _array[lastPos] = temp; } timer->setTimeout(timeout); // 下滤 上滤 percolateDown(pos); percolateUp(lastPos); } // 获得堆顶的定时器 template Timer<_UData> *HeapTimerContainer<_UData>::top() { if(isEmpty()) { return nullptr; } return &_array[0]->timer; } // 删除堆顶元素 template void HeapTimerContainer<_UData>::popTimer() { if(isEmpty()) { return; } if(_array[0]) { delete _array[0]; // 将原来的堆顶元素替换为堆数组中最后一个元素 _array[0] = _array[--_size]; // 对新的堆顶元素执行下滤操作 percolateDown(0); } } // 最小堆的下滤操作,它确保数组中以第hole个节点作为根的子树拥有最小堆性质 template void HeapTimerContainer<_UData>::percolateDown(int hole) { HeapTimer<_UData> *temp = _array[hole]; int child = 0; for(; ((hole * 2 + 1) <= _size - 1); hole = child) { child = hole * 2 + 1; if((child < (_size - 1)) && (_array[child + 1]->getExpire() < _array[child]->getExpire())) { child++; } if(_array[child]->getExpire() < temp->getExpire()) { _array[hole] = _array[child]; _array[hole]->setPos(hole); // 调整定时器的位置时,重新设置timer中pos保存的其在数组中的位置 } else { break; } } _array[hole] = temp; _array[hole]->setPos(hole); } template void HeapTimerContainer<_UData>::percolateUp(int hole) { int parent = 0; HeapTimer<_UData> *temp = _array[hole]; // 对从空穴到根节点的路径上的所有节点执行上滤操作 for(; hole > 0; hole = parent) { parent = (hole - 1) / 2; // 将新插入节点的超时值与父节点比较,如果父节点的值小于等于该节点的值,那么就无需再调整了。否则将父节点下移,继续这个操作。 if(_array[parent]->getExpire() <= temp->getExpire()) { break; } _array[hole] = _array[parent]; _array[hole]->setPos(hole); } _array[hole] = temp; _array[hole]->setPos(hole); } // 将数组的容量扩大一倍 template void HeapTimerContainer<_UData>::resize() { HeapTimer<_UData> **temp = new HeapTimer<_UData> *[2 * _capacity]; _capacity = 2 * _capacity; for(int i = 0; i < _size; i++) { temp[i] = _array[i]; } for(int i = _size; i < _capacity; i++) { temp[i] = nullptr; } delete []_array; _array = temp; } template bool HeapTimerContainer<_UData>::isEmpty() { return _size == 0; } #endif
下面的测试代码为使用epoll实现的一个回射服务器,每个客户端连接服务端后会为每个连接设置一个定时器,超时时间为15秒,每次进行数据交互后就会重置连接对应的定时器,如果定时器超时就会被服务器踢掉。程序中使用epoll_wait来将I/O事件与定时事件进行统一处理,使用定时容器中最小的超时时间作为epoll_wait的超时时长。启动服务端,连接三个客户端进行测试。可以看到三个客户端在超时时间到的时候都被踢掉了。如果客户端在超时时间内发送数据,那么服务端就会重置相应客户端的定时器。
test_heap_timer.cpp:
#include#include #include #include #include #include #include #include #include #include #include #include "heap_timer.hpp" using std::cout; using std::endl; #define PORT 6666 #define MAX_EVENTS 1024 #define MAX_BUF_SIZE 1024 struct Event; using readHandle = void(*)(Event *, ITimerContainer *); using writeHandle = void(*)(Event *, ITimerContainer *); // 自定义结构体,用来保存一个连接的相关数据 struct Event { int fd; char ip[64]; uint16_t port; epoll_event event; void *timer; char buf[MAX_BUF_SIZE]; int buf_size; readHandle read_cb; writeHandle write_cb; }; int epfd; // 超时处理的回调函数 void timeout_handle(Event *cli) { if(cli == nullptr) { return ; } cout << "Connection time out, fd:" << cli->fd << " ip:[" << cli->ip << ":" << cli->port << "]" << endl; epoll_ctl(epfd, EPOLL_CTL_DEL, cli->fd, &cli->event); close(cli->fd); delete cli; } void err_exit(const char *reason) { cout << reason << ":" << strerror(errno) << endl; exit(1); } // 设置非阻塞 int setNonblcoking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } // 设置端口复用 void setReusedAddr(int fd) { int reuse = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); } // 初始化server socket int socket_init(unsigned short port, bool reuseAddr) { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd < 0) { err_exit("socket error"); } if(reuseAddr) { setReusedAddr(fd); } struct sockaddr_in addr; bzero(&addr, 0); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = htonl(INADDR_ANY); int ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr)); if(ret < 0) { err_exit("bind error"); } setNonblcoking(fd); ret = listen(fd, 128); if(ret < 0) { err_exit("listen error"); } return fd; } void readData(Event *ev, ITimerContainer *htc) { ev->buf_size = read(ev->fd, ev->buf, MAX_BUF_SIZE - 1); if(ev->buf_size == 0) { close(ev->fd); htc->delTimer((Timer *)ev->timer); epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ev->event); delete ev; } ev->event.events = EPOLLOUT; epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &ev->event); } void writeData(Event *ev, ITimerContainer *htc) { write(ev->fd, ev->buf, ev->buf_size); ev->event.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_MOD, ev->fd, &ev->event); // 重新设置定时器 htc->resetTimer((Timer *)ev->timer, 15000); } // 接收连接回调函数 void acceptConn(Event *ev, ITimerContainer *htc) { Event *cli = new Event; struct sockaddr_in cli_addr; socklen_t sock_len = sizeof(cli_addr); int cfd = accept(ev->fd, (struct sockaddr *)&cli_addr, &sock_len); if(cfd < 0) { cout << "accept error, reason:" << strerror(errno) << endl; return; } setNonblcoking(cfd); cli->fd = cfd; cli->port = ntohs(cli_addr.sin_port); inet_ntop(AF_INET, &cli_addr.sin_addr, cli->ip, sock_len); cli->read_cb = readData; cli->write_cb = writeData; auto timer = htc->addTimer(15000); //设置客户端超时值15秒 timer->setUserData(cli); timer->setCallBack(timeout_handle); cli->timer = (void *)timer; cli->event.events = EPOLLIN; cli->event.data.ptr = (void *) cli; epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &cli->event); cout << "New Connection, ip:[" << cli->ip << ":" << cli->port << "]" << endl; } int main(int argc, char *argv[]) { int fd = socket_init(PORT, true); Event server; server.fd = fd; epfd = epoll_create(MAX_EVENTS); if(epfd < 0) { err_exit("epoll create error"); } server.event.events = EPOLLIN; server.event.data.ptr = (void *)&server; epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &server.event); ITimerContainer *htc = new HeapTimerContainer ; struct epoll_event events[MAX_EVENTS]; int nready = 0; int timeout = 10000; //设置超时值为10秒 while(1) { // 将定时容器中定时时间最短的时长作为epoll_wait的最大等待时间 auto temp_timer = htc->top(); if(temp_timer != nullptr) { timeout = temp_timer->getExpire() - getMSec(); } else { timeout = 10000; } nready = epoll_wait(epfd, events, MAX_EVENTS, timeout); if(nready < 0) { cout << "epoll wait error, reason:" << strerror(errno) << endl; } else if(nready > 0) { // 接收新的连接 for(int i = 0; i < nready; i++) { Event *ev = (Event *) events[i].data.ptr; // 接受新的连接 if(ev->fd == fd ) { acceptConn(ev, htc); } else if(ev->event.events & EPOLLIN) { ev->read_cb(ev, htc); } else if(ev->event.events & EPOLLOUT) { ev->write_cb(ev, htc); } } } else { htc->tick(); } } delete htc; return 0; }
本人能力有限,代码中难免存在一些Bug,还请见谅。如果有好的建议,敬请提出。
参考资料:《Linux高性能服务器编程》



