- 15.1 进程池和线程池概述
- 15.2 处理多客户
- 15.3 半同步/半异步线程池实现
- 15.4 使用进程池实现的简单的CGI服务器
- 15.5 半同步/半反应堆线程池实现
- 15.6 使用线程池实现的简单Web服务器
- 15.6.1 http_conn类
- 15.6.2 main函数
动态创建子进程或子线程的缺点:
- 动态创建进程或线程比较耗费时间,将导致较慢的客户响应;
- 动态从创建的子进程或子线程通常只能为一个客户服务,将导致系统上产生大量的细微进程或线程,且进程、线程间切换将消耗大量的CPU事件;
- 动态创建的子进程是当前进程的完整映像,当前进程必须谨慎地管理其分配的文件描述符和堆内存等系统资源,否则子进程可能会复制这些资源。
进程池是由服务器预先创建一组子进程,从而实现并发。线程池中的线程数量应该和CPU数量差不多。进程池中所有子进程都运行着相同的代码,并具有相同的属性。由于进程池在服务启动之初就创建好了,所以每个子进程都相对“干净”。
当有新任务到来时,主进程将通过某种方式选择进程池中的某一个子进程来为其服务,其主要通过两种方式:
- 主进程使用某种算法来主动选择子进程;
- 主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。
主进程需要某种通知机制来告诉紫禁城由新任务需要处理,并传递必要的数据。最简单的方法是在父进程和子进程之间预先建立好一条管道,然后通过该管道来实现所有的进程间通信;也可以将这些数据定义为全局的。
15.2 处理多客户
当客户的任务是无状态的,可以考虑使用不同的子进程来为该客户的不同请求服务。
15.3 半同步/半异步线程池实现
#ifndef PROCESSPOOL_H #define PROCESSPOOL_H #include#include #include #include #include #include #include #include #include #include #include #include #include #include #include // 描述一个子进程的类,m_pid是目标子进程的PID,m_pipefd是父进程和子进程通信的管道 class process{ public: process():m_pid(1){} public: pid_t m_pid; int m_pipefd[2]; }; // 进程池类,将其定义为模板类,模板参数是处理逻辑任务的类 template class processpool{ private: // 将构造函数定义为私有,只能通过creat静态函数创建实例 processpool(int listenfd, int process_number = 8); public: // 单体模式,保证程序最多创建一个processpool实例 static processpool * create(int listenfd, int process_number = 8){ if(!m_instance){ m_instance = new processpool (listenfd, process_number); } return m_instance; } ~processpool(){ delete [] m_sub_process; } // 启动进程池 void run(); private: // 统一事件源 void setup_sig_pipe(); void run_parent(); void run_child(); private: // 进程池允许的最大子进程数量 static const int MAX_PROCESS_NUMBER = 16; // 每个子进程最多能处理的客户数量 static const int USER_PER_PROCESS = 65536; // epoll最多能处理的事件数 static const int MAX_EVENT_NUMBER = 10000; // 进程池中的进程总数 int m_process_number; // 子进程在池中的序号,从0开始 int m_idx; // 每个进程都有一个epoll内核事件表,用m_epollfd标识 int m_epollfd; // 监听sockte int m_listenfd; // 子进程通过m_stop决定是否停止运行 int m_stop; // 保存所有子进程的描述信息 process* m_sub_process; // 进程池静态实例 static processpool * m_instance; }; template processpool * processpool ::m_instance = NULL; // 用于处理信号的管道,以实现统一事件源,后面称之为信号管道 static int sig_pipefd[2]; static int setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } static void addfd(int epollfd, int fd){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } // 从epollfd标志的epoll内核事件表中删除fd上的所有注册事件 static void removefd(int epollfd, int fd){ epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0); close(fd); } static void sig_handler(int sig){ int save_errno = errno; int msg = sig; send(sig_pipefd[1], (char*)&msg, 1, 0); errno = save_errno; } static void addsig(int sig, void(handler)(int), bool restart = true){ struct sigaciton sa; memset(&sa, ' ', sizeof(sa)); sa.sa_handler = handler; if(restart){ sa.sa_flags |= SA_RESTART; } sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL)!=-1); } // 进程池构造函数,参数listenfd是监听socket,process_number指定进程池中子进程的数量 template processpool ::processpool(int listenfd, int process_number) :m_listenfd(listenfd),m_process_number(process_number), m_idx(-1),m_stop(false){ assert((process_number>0)&&(process_number<=MAX_PROCESS_NUMBER)); m_sub_process = new process[process_number]; //创建process_number个子进程,并建立它们和夫进程之间的管道 for(int i=0;i =0); if(m_sub_process[i].m_pid>0){ close(m_sub_process[i].m_pipefd[1]); continue; } else{ close(m_sub_process[i].m_pipefd[0]); m_idx = i; break; } } } // 统一事件源 template void processpool ::setup_sig_pipe(){ // 创建epoll事件监听表和信号管道 m_epollfd = epoll_create(5); assert(m_epollfd != -1); int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd); assert(ret!=-1); setnonblocking(sig_pipefd[1]); addfd(m_epollfd, sig_pipefd[0]); // 设置信号处理函数 addsig(SIGHLD, sig_handler); addsig(SIGTERM, sig_handler); addsig(SIGINT, sig_handler); addsig(SIGPIPE, SIG_IGN); } // 父进程中m_idx为-1,子进程m_idx值大于等于0,由此判断运行父进程代码还是子进程代码 template void processpool ::run(){ if(m_idx != -1){ run_child(); return; } run_parent(); } template void processpool ::run_child(){ setup_sig_pipe(); // 每个子进程通过其在进程池的信号值m_idx找到与父进程通信的管道 int pipefd = m_sub_process[m_idx].m_pipefd[1]; // 子进程监听管道信号描述符pipefd,父进程通过其来通知子进程accept新连接 addfd(m_epollfd, pipefd); epoll_event events[MAX_EVENT_NUMBER]; T* users = new T[USER_PER_PROCESS]; assert(users); int number = 0; int ret = -1; while(!m_stop){ number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); if((number<0)&&(errno!=EINTR)){ printf("epoll failuren"); break; } for(int i=0;i 0){ continue; } break; } case SIGTERM: case SIGINT:{ m_stop = true; break; } default:{ break; } } } } } // 如果是其他可读数据, 必然是客户请求到来,调用逻辑处理对象的process方法处理 else if(events[i].events&EPOLLIN){ users[sockfd].process(); } else{ continue; } } } delete [] users; users = NULL; close(pipefd); // colse(m_listenfd); // 应该由m_listenfd创建者关闭 close(m_epollfd); } template void processpool ::run_parent(){ setup_sig_pipe(); // 父进程监听m_listenfd addfd(m_epollfd, m_listenfd); epoll_event events[MAX_EVENT_NUMBER]; int sub_process_counter = 0; int new_conn = 1; int number = 0; int ret = -1; while(!m_stop){ number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); if((number<0)&&(errno!=EINTR)){ printf("epoll failuren"); break; } for(int i=0;i 0){ for(int i=0;i
15.4 使用进程池实现的简单的CGI服务器#include#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "./processpool.h" // 用于处理客户CGI请求的类,可以作为processpool类的模板参数 class cgi_conn{ public: cgi_conn(){} ~cgi_conn(){} // 初始化客户端,清空缓冲区 void init(int epollfd, int sockfd, const sockaddr_in& client_addr){ m_epollfd = epollfd; m_sockfd = sockfd; m_address = client_addr; memset(m_buf, ' ', BUFFER_SIZE); m_read_idx = 0; } void process(){ int idx = 0; int ret = -1; // 循环读取和分析客户数据 while(true){ idx = m_read_idx; ret = recv(m_sockfd, m_buf+idx, BUFFER_SIZE-1-idx, 0); // 若读操作发生错误,则关闭客户连接,如果是暂时无数据可读,则退出循环 if(ret<0){ if(errno!=EAGAIN){ removefd(m_epollfd, m_sockfd); } break; } // 如果对方关闭连接,则服务器也关闭连接 else if(ret==0){ removefd(m_epollfd, m_sockfd); break; } else{ m_read_idx += ret; printf("user content is: %sn", m_buf); //如果遇到"rn",则开始处理客户请求 for(;idx =1)&&(m_buf[idx-1]=='r')&&(m_buf[idx]=='n')){ break; } } // 如果没遇到字符"rn",则需要处理更多客户数据 if(idx==m_read_idx){ continue; } m_buf[idx-1] = ' '; char* file_name = m_buf; // 判断客户要运行的CGI程序是否存在 if(access(file_name, F_OK)==-1){ removefd(m_epollfd,m_sockfd); break; } // 创建子进程执行CGI程序 ret = fork(); if(ret == -1){ removefd(m_epollfd, m_sockfd); break; } else if(ret>0){ // 父进程只需要关闭连接 removefd(m_epollfd, m_sockfd); break; } else{ // 子进程将标准输出定向到m_sockfd,并执行CGI程序 close(STDOUT_FILENO); dup(m_sockfd); execl(m_buf, m_buf, 0); exit(0); } } } } private: // 读缓冲区的大小 static const int BUFFER_SIZE = 1024; static int m_epollfd; int m_sockfd; sockaddr_in m_address; char m_buf[BUFFER_SIZE]; // 标记缓冲区中已经读入客户数据最后一个自己的下一个位置 int m_read_idx; }; int cgi_conn::m_epollfd = -1; // 主函数 int main(int argc, char* argv[]){ if(argc<=2){ printf("usage: %s ip_address port_number n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int listenfd = socket(PF_INET, SOCK_STERAM, 0); assert(listenfd>=0); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret!=-1); ret = listen(listenfd, 5); assert(ret!=-1); processpool * pool = processpool ::create(listenfd); if(pool){ pool->run(); delete pool; } close(listenfd); return 0; }
15.5 半同步/半反应堆线程池实现#ifndef THREADPOOL_H #define THREADPOOL_H #include#include
#include #include #include "./locker.h" // 线程池类 template class threadpool{ public: threadpool(int thread_number=8,int max_requests=10000); ~threadpool(); // 往请求队列中添加任务 bool append(T* request); private: // 工作线程运行函数,不断从工作队列中取出任务并执行 static void* worker(void* arg); void run(); private: int m_thread_number; // 线程数 int m_max_requests; // 请求队列中允许的最大请求数 pthread_t* m_threads; // 线程池的组数 std::list m_workqueue; // 请求队列 locker m_queuelocker; // 请求队列的互斥锁 sem m_queuestat; // 是否有任务需要处理 bool m_stop; // 是否结束线程 }; template threadpool ::threadpool(int thread_number, int max_requests): m_thread_number(thread_number),m_max_requests(max_requests), m_stop(false),m_threads(NULL){ if((thread_number<=0)||(max_requests<=0)){ throw std::exception(); } m_thread = new pthread_t(m_thread_number); if(!m_threads){ throw std::exception(); } // 创建thread_number个线程,并设置为脱离线程 for(int i=0;i threadpool ::~threadpool(){ delete [] m_threads; m_stop = true; } template bool threadpool ::append(T* requests){ // 操作工作队列要加锁 m_queuelocker.lock(); if(m_queuelocker.size()>m_max_requests){ m_queuelocker.unlock(); return false; } m_workqueue.push_back(requests); m_queuelocker.unlock(); m_queuestat.post(); // 添加信号量 return true; } template void* threadpool ::worker(void* arg){ threadpool* pool = (threadpool*)arg; pool->run(); return pool; } template void threadpool ::run(){ while(!m_stop){ m_queuestat.wait(); m_queuelocker.lock(); if(m_workqueue.empty()){ m_queuelocker.unlock(); continue; } T* request = m_workqueue.front(); m_workqueue.pop_front(); m_queuelocker.unlock(); if(!request){ continue; } request->process(); } } #endif // !THREADPOOL_H
15.6 使用线程池实现的简单Web服务器 15.6.1 http_conn类http_conn.h文件
#ifndef HTTPCONNECTION_H #define HTTPCONNECTION_H #include#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "./locker.h" class http_conn{ public: // 文件名的最大长度 static const int FILENAME_LEN = 200; // 读缓冲区的大小 static const int READ_BUFFER_SIZE = 2048; // 写缓冲区的大小 static const int WRITE_BUFFER_SIZE = 1024; // HTTP请求方法,仅支持GET enum METHOD {GET = 0, POST, HEAD, PUT, DELETE, TRACE, OPTIONS, CONNECT, PATCH}; // 解析客户请求时,主状态机所处的状态 enum CHECK_STATE { CHECK_STATE_REQUESTLINE = 0, // 当前正在分析请求行 CHECK_STATE_HEADER, // 当前正在分析头部字段 CHECK_STATE_ConTENT }; // 从在状态机的三种可能状态,即行的读取状态 enum LINE_STATUS { LINE_OK = 0, // 读取到一个完整的行 LINE_BAD, // 行出错 LINE_OPEN // 行数据尚且不完整 }; // 服务器处理HTTP请求的结果 enum HTTP_CODE { NO_REQUEST, // 请求不完整,需要继续获取客户数据 GET_REQUEST, // 获得了一个完整的客户请求 BAD_REQUEST, // 客户请求有语法错误 NO_RECOURCE, FORBIDDEN_REQUEST, // 客户对资源没有足够的访问权限 FILE_REQUEST, INTERNAL_ERROR, // 服务器内部错误 CLOSE_ConNECTION // 客户端已经关闭连接 }; public: http_conn(){} ~http_conn(){} public: // 初始化新接受的连接 void init(int sockfd, const sockaddr_in& addr); // 关闭连接 void close_conn(bool real_close = true); // 处理客户请求 void process(); // 非阻塞读操作 bool read(); // 非阻塞写操作 bool write(); private: // 初始化连接 void init(); // 解析HTTP请求 HTTP_CODE process_read(); // 填充HTTP应答 bool process_write(HTTP_CODE ret); // 被process_read调用以分析HTTP请求 HTTP_CODE parse_request_line(char* text); HTTP_CODE parse_headers(char* text); HTTP_CODE parse_content(char* text); HTTP_CODE do_request(); char* get_lin() { return m_read_buf+m_start_line; } LINE_STATUS parse_line(); // 被process_write调用以填充HTTP应答 void unmap(); bool add_response(const char* format, ... ); bool add_content(const char* content); bool add_status_line(int status, const char* title); bool add_headers(int content_length); bool add_content_length(int content_length); bool add_linger(); bool add_blank_line(); public: // 所有socket上的事件都被注册到同一个epoll内核事件中 static int m_epollfd; // 统计用户数量 static int m_user_count; private: // 该HTTP连接的socket和对方的socket地址 int m_sockfd; sockaddr_in m_address; // 读缓冲区 char m_read_buf[READ_BUFFER_SIZE]; // 标识都缓冲住已经读入客户端数据的最后一个字节的下一个位置 int m_read_idx; // 当前正在分析的字符在都缓冲区的位置 int m_checked_idx; // 当前正在解析的行的起始位置 int m_start_line; // 写缓冲区 char m_write_buf[WRITE_BUFFER_SIZE]; // 写缓冲区中待发送的字节数 int m_write_idx; // 主状态机当前所处的状态 CHECK_STATE m_check_state; // 请求方法 METHOD m_method; // 客户请求的目标文件的完整路径 char m_real_file[FILENAME_LEN]; // 客户请求的目标文件的文件名 char* m_url; // HTTP协议版本号,仅支持HTTP/1.1 char* m_version; // 主机名 char* m_host; // HTTP请求的消息体长度 int m_content_length; // HTTP请求是否保持连接 bool m_linger; // 客户请求的目标文件被mmap到内存中的起始位置 char* m_file_address; // 目标文件的状态 struct stat m_file_stat; // 采用writev执行写操作 struct iovec m_iv[2]; // 被写内存块的数量 int m_iv_count; }; #endif // !HTTPCONNECTION_H http_conn.cpp文件
#include "./http_conn.h" // 定义HTTP响应的一些状态信息 const char* ok_200_title = "OK"; const char* error_400_title = "Bad Request"; const char* error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.n"; const char* error_403_title = "Forbidden"; const char* error_403_form = "You do not have permission to get file from this server.n"; const char* error_404_title = "Not Found"; const char* error_404_form = "The requested file was not found on this server.n"; const char* error_500_title = "Internal Error"; const char* error_500_form = "There was an unusual problen serving the requested file.n"; // 网站的根目录 const char* doc_root = "/var/www/html"; static int setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } // 将fd上的EPOLLIN和EPOLLET事件注册到epollfd指示上的epoll内核事件中 // 参数oneshot指定是否注册fd上的EPOLLONESHOT事件 static void addfd(int epollfd, int fd, bool one_shot){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; if(one_shot){ event.events |= EPOLLONESHOT; } epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } // 从epollfd标志的epoll内核事件表中删除fd上的所有注册事件 static void removefd(int epollfd, int fd){ epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0); close(fd); } void modfd(int epollfd, int fd, int ev){ epoll_event event; event.data.fd = fd; event.events = ev | EPOLLET | EPOLLonESHOT | EPOLLRDHUP; epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); } int http_conn::m_user_count = 0; int http_conn::m_epollfd = -1; void http_conn::close_conn(bool real_close){ if(real_close && (m_sockfd != -1)){ removefd(m_epollfd, m_sockfd); m_sockfd = -1; m_user_count--; // 关闭连接时,客户总量-1 } } void http_conn::init(int sockfd, const sockaddr_in& addr){ m_sockfd = sockfd; m_address = addr; // 下面两行避免TIME_WAIT状态,仅用于调试,实际使用应当去掉 int reuse = 1; setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); addfd(m_epollfd, sockfd, true); m_user_count++; init(); } void http_conn::init(){ m_check_state = CHECK_STATE_REQUESTLINE; m_linger = false; m_method = GET; m_url = 0; m_version = 0; m_content_length = 0; m_host = 0; m_start_line = 0; m_checked_idx = 0; m_read_idx = 0; m_write_idx = 0; memset(m_read_buf, ' ', READ_BUFFER_SIZE); memset(m_write_buf, ' ', WRITE_BUFFER_SIZE); memset(m_real_file, ' ', FILENAME_LEN); } // 从状态机,用于解析出一行内容 http_conn::LINE_STATUS http_conn::parse_line(){ char temp; // checked_index指向buffer当前分析的字节 // read_index指向buffer中客户数据为不得下一个字节 // 第checked_index~(read_index-1)字节右下面进行分析 for(; m_checked_idx


