本文主要使用linux epoll实现了一个reactor模型,并且在此基础上实现一个http server demo。网上有很多关于epoll和reactor理论知识的介绍,本篇博客也不过多说明理论基础,如果有兴趣的朋友可以参考其他博客。
epoll API 创建epollint epoll_create(int size);控制EPOLL
int epoll_ctl(int epfd, int op, int fd,
struct epoll_event *event);
struct epoll_event {
__uint32_t events;
epoll_data_t data;
}
typedef union epoll_data {
void *ptr; //用户希望传递的自定义指针,可以存储任意数据,一般在实际开发中使用ptr最多
int fd; //socket fd
uint32_t u32;
uint64_t u64;
} epoll_data_t;
等待EPOLL
int epoll_wait(int epfd, struct epoll_event *event,
int maxevents, int timeout);
epoll的使用
//创建 epoll
int epfd = epoll_crete(1);
//将 listen_fd 添加进 epoll 中
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd,&listen_event);
while (1) {
//阻塞等待 epoll 中 的fd 触发
int active_cnt = epoll_wait(epfd, events, 1000, -1);
for (i = 0 ; i < active_cnt; i++) {
if (evnets[i].data.fd == listen_fd) {
//accept. 并且将新accept 的fd 加进epoll中.
}
else if (events[i].events & EPOLLIN) {
//对此fd 进行读操作
}
else if (events[i].events & EPOLLOUT) {
//对此fd 进行写操作
}
}
}
reactor 模型
reactor解决问题
对于未使用reactor模型的网络服务应用中,当某个socket对应的事件发生时,就去执行对应的处理,比如当某个客户端连接到客户端,我们就去执行客户端的连接处理,或者当客户端给服务器发送数据时,服务器端就执行相应的读逻辑,一般在服务器网络应用中,socket都会设置为非阻塞模式,并且服务器会使用多线程或者线程池来处理大量的客户端的请求,相比于单线程的服务器而言,能极大的提高服务器性能,但是不可忽视的是,线程的切换也会带来巨大的性能损失。
在分布式系统尤其是服务器这一类事件驱动应用中,虽然这些请求最终会被序列化地处理,但是必须时刻准备着处理多个同时到来的服务请求。在实际应用 中,这些请求总是通过一个事件(如CONNECTOR、READ、WRITE等)来表示的。在有序地处理这些服务请求之前,应用程序必须先分离和调度这些 同时到达的事件。为了有效地解决这个问题,我们需要做到以下4方面:
(1)为了提高系统的可测量性和反应时间,应用程序不能长时间阻塞在某个事件源上而停止对其他事件的处理,这样会严重降低对客户端的响应度。
(2) 为了提高吞吐量,任何没有必要的上下文切换、同步和CPU之间的数据移动都要避免。
(3)引进新的服务或改良已有的服务都要对既有的事件分离和调度机制带来尽可能小的影响。
(4)大量的应用程序代码需要隐藏在复杂的多线程和同步机制之后。
在一个或多个事件源上等待事件的到来,例如,一个已经连接的Socket描述符就是一个事件源。将事件的分离和调度整合到处理它的服务中,而将分离和调度机制从应用程序对特定事件的处理中分离开,也就是说分离和调度机制与特定的应用程序无关。
具体来说,每个应用程序提供的每个服务都有一个独立的事件处理器与之对应。由事件处理器处理来自事件源的特定类型的事件。每个事件处理器都事先注册 到Reactor管理器中。Reactor管理器使用同步事件分离器在一个或多个事件源中等待事件的发生。当事件发生后,同步事件分离器通知 Reactor管理器,最后由Reactor管理器调度和该事件相关的事件处理器来完成请求的服务。
reactor结构在Reactor模式中,有5个关键的参与者。
(1)描述符(handle):由操作系统提供,用于识别每一个事件,如Socket描述符、文件描述符等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端的连接请求、数据等。事件也可以来自内部,如定时器事件。
(2)同步事件分离器(demultiplexer):是一个函数,用来等待一个或多个事件的发生。调用者会被阻塞,直到分离器分离的描述符集上有事件发生。Linux的epoll函数是一个经常被使用的分离器。
(3)事件处理器接口(event handler):是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作。
(4) 具体的事件处理器 :是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。
(5)Reactor 管理器(reactor):定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。
Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模 板函数来处理这个事件。 通过上述分析,我们注意到,是Reactor管理器而不是应用程序负责等待事件、分离事件和调度事件。实际上,Reactor管理器并没有被具体的 事件处理器调用,而是管理器调度具体的事件处理器,由事件处理器对发生的事件做出处理。这就是类似Hollywood原则的“反向控制”。应用程序要做的 仅仅是实现一个具体的事件处理器,然后把它注册到Reactor管理器中。接下来的工作由管理器来完成。
reactor实现 结构体
struct ntyevent {
int fd;
int events; //EPOLLIN ,EPOLLOUT
void *arg; //回调参数
int (*callback)(int fd, int events, void *arg);//回调函数
int status;//该事件是否处于reactor管理中
char buffer[BUFFER_LENGTH];//sendbuffer or recvbuffer
int length; //buffer length
//说明:如果想自己使用这个代码测试100w的客户端连接到服务器,这个参数就不需要,并把timeout的相关代码注释掉
//需要修改 /etc/security/limits.conf文件,让支持的文件数达到百万级的情况,建议使用多台虚拟机同时测试,
//另外服务器的内存需要16G以上,一般情况下,16G可能能支持到60-70w左右的连接情况。
long last_active; //最后一次设置该event的时间(也是最后一次活跃的时间,timeout的检测使用)
// http param - 只是demo这样写
int method; //方法,GET,POST等
char resource[BUFFER_LENGTH];//请求的资源
int ret_code;//服务器返回的状态码
};
struct eventblock {
struct eventblock *next;
struct ntyevent *events;//指向存放1024个连接的struct ntyevent对象的指针
};
struct ntyreactor {
int epfd; //epoll_create创建的epoll fd
int blkcnt;//eventblock的个数
struct eventblock *evblk; //指向eventblock对象的指针 -->支持100w级别的fd。
};
以下是结构体之间的联系,有一个全局的reactor,evblk指向第一个eventblock,每个eventblock对象的events指针指向1024个netyevent对象的第一个对象的内存。
1.初始化reactor 2.初始化监听socket 3.添加监听器到reactor 4.开启reactor主线程,持续监听所有的socket事件 5. 释放reactor工作初始化和释放reactor
初始化reactor工作很简单,主要是ntyreactor对象中的成员的初始化,主要包括:
1.创建epoll 2.分配一个事件块reactor block 3.分配一个连续的事件空间reactor
释放reactor工作也很简单
释放所有事件块(eventblock)空间和关闭epoll fd。创建监听socket和添加到reactor中
创建监听的socket的流程很简单,主要用于接受客户端的连接(connection),简单说下一个connection由一个五元组
1.就是创建一个非阻塞的socket,并进行端口绑定,然后开始监听客户端的连接 2.将监听的socket加入到reactor中: 2.1 判断监听socket和哪个event对象进行对应(后面会提及对应关系) 2.2 找到监听fd对应的event对象后,为该event对象设置event对象的fd,回调处理逻辑 2.3 将监听fd的event对象添加到epoll中进行管理定位socket和event的关系
由于reactor中管理了大量socket对应的事件,因此我们需要知道如何组织这些大量的socket,当我们创建好一个socket后(监听socket和读写socket),应该放在哪个块(eventblock)的哪个位置呢?
首先,我们需要知道创建服务器创建socket得到的socket值是连续分配的,比如,数字0,1,2代表标准输入,标准输出,错误,那么创建的socket值必须是大于2,假如第一次创建的socket为3,那么接下来创建的socket就是4,5,6…等等。
明白了socket创建返回的值是连续之后,我们就可以用连续的一块内存(数组)来和连续的socket进行对应,比如创建socket的返回值为655,那么用第一个块(eventblock)中下标为655的ntyevent对象中,如果socket的返回值大于1024小于2048,那么就在第二个块(eventblock)的某一个位置,其他的就不过多做说明。当然,在查找socket和event关系的过程中,会存在块的分配,代码如下:
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) {
int blkidx = sockfd / MAX_EPOLL_EVENTS; //找到块的位置,从0开始
while (blkidx >= reactor->blkcnt) {//需要重新分配内存来存放sockfd
ntyreactor_alloc(reactor);
}
int i = 0;
struct eventblock *blk = reactor->evblk;
while(i ++ < blkidx && blk != NULL) {
blk = blk->next;
}
return &blk->events[sockfd % MAX_EPOLL_EVENTS]; //找到具体sockfd对应的event位置
}
int ntyreactor_alloc(struct ntyreactor *reactor) {
if (reactor == NULL) return -1;
if (reactor->evblk == NULL) return -1;
struct eventblock *blk = reactor->evblk;
while (blk->next != NULL) {
blk = blk->next;
}
struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (evs == NULL) {
printf("ntyreactor_alloc ntyevents failedn");
return -2;
}
memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock));
if (block == NULL) {
printf("ntyreactor_alloc eventblock failedn");
return -2;
}
memset(block, 0, sizeof(struct eventblock));
block->events = evs;
block->next = NULL;
blk->next = block;
reactor->blkcnt ++; //
return 0;
}
完整代码
#include总结#include #include #include #include #include #include #include #include #include #include #include #define BUFFER_LENGTH 4096 #define MAX_EPOLL_EVENTS 1024 #define SERVER_PORT 8888 #define PORT_COUNT 1 #define HTTP_WEBSERVER_HTML_ROOT "html" #define HTTP_METHOD_GET 0 #define HTTP_METHOD_POST 1 typedef int NCALLBACK(int ,int, void*); struct ntyevent { int fd; int events; //EPOLLIN ,EPOLLOUT void *arg; //回调参数 int (*callback)(int fd, int events, void *arg);//回调函数 int status;//该事件是否处于reactor管理中 char buffer[BUFFER_LENGTH];//sendbuffer or recvbuffer int length; //buffer length //说明:如果想自己使用这个代码测试100w的客户端连接到服务器,这个参数就不需要,并把timeout的相关代码注释掉 //需要修改 /etc/security/limits.conf文件,让支持的文件数达到百万级的情况,建议使用多台虚拟机同时测试, //另外服务器的内存需要16G以上,一般情况下,16G可能能支持到60-70w左右的连接情况。 long last_active; //最后一次设置该event的时间(也是最后一次活跃的时间,timeout的检测使用) // http param - 只是demo这样写 int method; //方法,GET,POST等 char resource[BUFFER_LENGTH];//请求的资源 int ret_code;//服务器返回的状态码 }; struct eventblock { struct eventblock *next; struct ntyevent *events;//指向存放1024个连接的struct ntyevent对象的指针 }; struct ntyreactor { int epfd; //epoll_create创建的epoll fd int blkcnt;//eventblock的个数 struct eventblock *evblk; //指向eventblock对象的指针 -->支持100w级别的fd。 }; int recv_cb(int fd, int events, void *arg); int send_cb(int fd, int events, void *arg); //根据sockfd来确定事件应该存放到哪个块的哪个对象中 struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd); //根据参数设置event事件 void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) { ev->fd = fd; ev->callback = callback;//回调函数 ev->events = 0;//默认没有事件 ev->arg = arg;//回调参数-reactor对象 ev->last_active = time(NULL);//创建事件的时间 return ; } //将事件添加到reactor中 int nty_event_add(int epfd, int events, struct ntyevent *ev) { struct epoll_event ep_ev = {0, {0}}; //ev对应的epoll 事件 ep_ev.data.ptr = ev; ep_ev.events = ev->events = events; EPOLLIN , EPOLLOUT //事件可能会多次调用,因此会判断是修改还是添加事件 int op; if (ev->status == 1) { op = EPOLL_CTL_MOD; } else { op = EPOLL_CTL_ADD; ev->status = 1; } if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) { printf("event add failed [fd=%d], events[%d]n", ev->fd, events); return -1; } return 0; } //将事件从reactor中删除 int nty_event_del(int epfd, struct ntyevent *ev) { struct epoll_event ep_ev = {0, {0}}; //该事件已经从reactor中删除 if (ev->status != 1) { return -1; } ep_ev.data.ptr = ev;//删除事件也要将指针传入 ev->status = 0; epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev); return 0; } int readline(char *allbuf, int idx, char *linebuf) { int len = strlen(allbuf); for(;idx < len;idx ++) { if (allbuf[idx] == 'r' && allbuf[idx+1] == 'n') {//http协议的一行以rn结尾 return idx+2; } else { *(linebuf++) = allbuf[idx]; } } return -1; } int http_request(struct ntyevent *ev) { // GET, POST char linebuf[1024] = {0}; int idx = readline(ev->buffer, 0, linebuf); if (strstr(linebuf, "GET")) { ev->method = HTTP_METHOD_GET; //uri int i = 0; while (linebuf[sizeof("GET ") + i] != ' ') i++;//一直从GET后的空格到状态码之前的空格(2个空格都不包含) linebuf[sizeof("GET ")+i] = ' '; //注意,资源时放在当前文件夹下的html文件夹下 sprintf(ev->resource, "./%s/%s", HTTP_WEBSERVER_HTML_ROOT, linebuf+sizeof("GET ")); } else if (strstr(linebuf, "POST")) { } } int recv_cb(int fd, int events, void *arg) { struct ntyreactor *reactor = (struct ntyreactor*)arg; struct ntyevent *ev = ntyreactor_idx(reactor, fd); //如果使用epoll的水平触发方式(epoll的默认触发方式),如果没有读取完所有的 //请求内容,会继续读取,如果使用边沿触发方式,请使用 //while循环读完所有的请求内容后,在对其进行处理, //如果对epoll的水平触发和边沿触发不请求的同学可以自行 //百度 //但是我们这里只是测试,就一次性的读取了所有内容 int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0); // if (len > 0) { ev->length = len; ev->buffer[len] = ' '; printf("C[%d]:%sn", fd, ev->buffer); //http http_request(ev);//解析请求 //send(); //如果使用水平触发,这里不应该删除,下面也不要使用立刻使用nty_event_add触发写事件 //实际中需要判断某次请求的内容是否已经完全读取到,并使用http_request处理完,在触发 //写事件进行一定的处理,然后回发给客户端 nty_event_del(reactor->epfd, ev); nty_event_set(ev, fd, send_cb, reactor); nty_event_add(reactor->epfd, EPOLLOUT, ev);// } else if (len == 0) { nty_event_del(reactor->epfd, ev); close(ev->fd); //printf("[fd=%d] pos[%ld], closedn", fd, ev-reactor->events); } else { nty_event_del(reactor->epfd, ev); close(ev->fd); printf("recv[fd=%d] error[%d]:%sn", fd, errno, strerror(errno)); } return len; } int http_response(struct ntyevent *ev) { if (ev == NULL) return -1; memset(ev->buffer, 0, BUFFER_LENGTH); #if 0 const char *html = " hello http Kingrnrn"; ev->length = sprintf(ev->buffer, "HTTP/1.1 200 OKrn Date: Thu, 11 Nov 2021 12:28:52 GMTrn Content-Type: text/html;charset=ISO-8859-1rn Content-Length: 83rnrn%s", html); #else int filefd = open(ev->resource, O_RDONLY); if (filefd == -1) { // return 404, 服务器不存在对应的资源文件 ev->ret_code = 404; ev->length = sprintf(ev->buffer, "HTTP/1.1 404 Not Foundrn" "Date: Thu, 11 Nov 2021 12:28:52 GMTrn" "Content-Type: text/html;charset=ISO-8859-1rn" "Content-Length: 85rnrn" "404 Not Found 404rnrn" ); } else { struct stat stat_buf; fstat(filefd, &stat_buf); close(filefd); if (S_ISDIR(stat_buf.st_mode)) { //路径 ev->ret_code = 404; ev->length = sprintf(ev->buffer, "HTTP/1.1 404 Not Foundrn" "Date: Thu, 11 Nov 2021 12:28:52 GMTrn" "Content-Type: text/html;charset=ISO-8859-1rn" "Content-Length: 85rnrn" "404 Not Found 404rnrn" ); } else if (S_ISREG(stat_buf.st_mode)) { //文件 ev->ret_code = 200; ev->length = sprintf(ev->buffer, "HTTP/1.1 200 OKrn" "Date: Thu, 11 Nov 2021 12:28:52 GMTrn" "Content-Type: text/html;charset=ISO-8859-1rn" "Content-Length: %ldrnrn", stat_buf.st_size ); } } #endif return ev->length; } int send_cb(int fd, int events, void *arg) { struct ntyreactor *reactor = (struct ntyreactor*)arg; struct ntyevent *ev = ntyreactor_idx(reactor, fd); http_response(ev); // int len = send(fd, ev->buffer, ev->length, 0);//发送头的内容 if (len > 0) { printf("send[fd=%d], [%d]%sn", fd, len, ev->buffer); if (ev->ret_code == 200) { int filefd = open(ev->resource, O_RDONLY); struct stat stat_buf; fstat(filefd, &stat_buf); //发送body(文件内容)---可以看看这个函数的作用 //简单的说就是直接将文件内容发送给客户端,避免了 //很多拷贝工作 sendfile(fd, filefd, NULL, stat_buf.st_size); close(filefd); } nty_event_del(reactor->epfd, ev); nty_event_set(ev, fd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, ev); } else { close(ev->fd); nty_event_del(reactor->epfd, ev); printf("send[fd=%d] error %sn", fd, strerror(errno)); } return len; } int accept_cb(int fd, int events, void *arg) { struct ntyreactor *reactor = (struct ntyreactor*)arg; if (reactor == NULL) return -1; struct sockaddr_in client_addr; socklen_t len = sizeof(client_addr); int clientfd; if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1) { if (errno != EAGAIN && errno != EINTR) { } printf("accept: %sn", strerror(errno)); return -1; } int flag = 0; if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) { printf("%s: fcntl nonblocking failed, %dn", __func__, MAX_EPOLL_EVENTS); return -1; } struct ntyevent *event = ntyreactor_idx(reactor, clientfd); nty_event_set(event, clientfd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, event); printf("new connect [%s:%d], pos[%d]n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd); return 0; } int init_sock(short port) { int fd = socket(AF_INET, SOCK_STREAM, 0); fcntl(fd, F_SETFL, O_NONBLOCK); struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(port); bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr)); if (listen(fd, 20) < 0) { printf("listen failed : %sn", strerror(errno)); } return fd; } int ntyreactor_alloc(struct ntyreactor *reactor) { if (reactor == NULL) return -1; if (reactor->evblk == NULL) return -1; struct eventblock *blk = reactor->evblk; while (blk->next != NULL) { blk = blk->next; } struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); if (evs == NULL) { printf("ntyreactor_alloc ntyevents failedn"); return -2; } memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock)); if (block == NULL) { printf("ntyreactor_alloc eventblock failedn"); return -2; } memset(block, 0, sizeof(struct eventblock)); block->events = evs; block->next = NULL; blk->next = block; reactor->blkcnt ++; // return 0; } struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd) { int blkidx = sockfd / MAX_EPOLL_EVENTS; while (blkidx >= reactor->blkcnt) {//需要分配块和相应的事件内存 ntyreactor_alloc(reactor); } int i = 0; struct eventblock *blk = reactor->evblk; while(i ++ < blkidx && blk != NULL) { blk = blk->next; } return &blk->events[sockfd % MAX_EPOLL_EVENTS]; } int ntyreactor_init(struct ntyreactor *reactor) { if (reactor == NULL) return -1; memset(reactor, 0, sizeof(struct ntyreactor)); reactor->epfd = epoll_create(1); if (reactor->epfd <= 0) { printf("create epfd in %s err %sn", __func__, strerror(errno)); return -2; } struct ntyevent *evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); if (evs == NULL) { printf("ntyreactor_alloc ntyevents failedn"); return -2; } memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent)); struct eventblock *block = (struct eventblock *)malloc(sizeof(struct eventblock)); if (block == NULL) { printf("ntyreactor_alloc eventblock failedn"); return -2; } memset(block, 0, sizeof(struct eventblock)); block->events = evs; block->next = NULL; reactor->evblk = block; reactor->blkcnt = 1; return 0; } int ntyreactor_destory(struct ntyreactor *reactor) { close(reactor->epfd); //free(reactor->events); struct eventblock *blk = reactor->evblk; struct eventblock *blk_next = NULL; while (blk != NULL) { blk_next = blk->next; free(blk->events); free(blk); blk = blk_next; } return 0; } int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) { if (reactor == NULL) return -1; if (reactor->evblk == NULL) return -1; //reactor->evblk->events[sockfd]; struct ntyevent *event = ntyreactor_idx(reactor, sockfd); nty_event_set(event, sockfd, acceptor, reactor); nty_event_add(reactor->epfd, EPOLLIN, event); return 0; } int ntyreactor_run(struct ntyreactor *reactor) { if (reactor == NULL) return -1; if (reactor->epfd < 0) return -1; if (reactor->evblk == NULL) return -1; struct epoll_event events[MAX_EPOLL_EVENTS+1]; int checkpos = 0, i; while (1) { //timeout监测,每次循环检测100个 long now = time(NULL); for (i = 0;i < 100;i ++, checkpos ++) { if (checkpos == MAX_EPOLL_EVENTS) { checkpos = 0; } if (reactor->events[checkpos].status != 1) { continue; } long duration = now - reactor->events[checkpos].last_active; if (duration >= 60) { close(reactor->events[checkpos].fd); printf("[fd=%d] timeoutn", reactor->events[checkpos].fd); nty_event_del(reactor->epfd, &reactor->events[checkpos]); } } //如果有事件发生,就会往下走,或者1000ms后也往后走 int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000); if (nready < 0) { printf("epoll_wait error, exitn"); continue; } for (i = 0;i < nready;i ++) { struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr; //发生了读事件 if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) { ev->callback(ev->fd, events[i].events, ev->arg); } //发生了写事件 if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) { ev->callback(ev->fd, events[i].events, ev->arg); } } } } // 3, 6w, 1, 100 == //int main(int argc, char *argv[]) { unsigned short port = SERVER_PORT; // listen 8888 if (argc == 2) { port = atoi(argv[1]); } struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor)); ntyreactor_init(reactor); int i = 0; int sockfds[PORT_COUNT] = {0}; for (i = 0;i < PORT_COUNT;i ++) { sockfds[i] = init_sock(port+i); ntyreactor_addlistener(reactor, sockfds[i], accept_cb); } ntyreactor_run(reactor); ntyreactor_destory(reactor); for (i = 0;i < PORT_COUNT;i ++) { close(sockfds[i]); } free(reactor); return 0; }
在网络服务器开发中,reactor模型是非常重要的,比如常见的开源网络库libevent,libev等都使用了reactor模型,总的来说,reactor模型就是将大量的fd转换为对应的事件,并集中进行管理。
在代码中并简单的实现了http server 的一个demo。



