- 服务端并发的概念
- 常见单机服务模型
- 并发测试方法
- socket数量的限制(描述符数量的限制)
- 客户端测试代码
- 测试结果
- 改进代码提升测试并发量
- 1 单线程多端口同时监听
- 修改代码:
- 测试现象:
- 分析:
- 解决方法:
- 实际的做法
- 2 增加多线程处理
- 现象:处理时间存在明显抖动
- 原因:
- 处理方法:多线程accept
并发量:同时承载的客户端的数量,实质上就是同时能够维护的 socket 的数量
比如要实现10W客户端同时在线,200ms内正常返回结果,承载量的影响因素:
- 数据库
- 网络带宽
- 内存操作
- 日志
测试服务器十万、百万并发能力的意义更多地在于测试服务器是否具备这种能力,它并不是与业务直接相关的。
常见单机服务模型- 单线程同步:NTP
- 多线程同步:Natty
- 纯异步:Redis、HAProxy
- 半同步半异步:Natty
- 多进程同步:fastcgi
- 多线程异步:memcached
- 多进程异步:Nginx
- 一个请求一个进程/线程:Apache/CGI
- 微进程框架:erlang/go/lua
- 协程框架:libco/ntyco
实际上,我们之前基于epoll实现的Reactor模型的简单echo服务器就应经可以实现几十万量级的并发了,我们可以测试一下。测试之前需要先修改一下描述符数量的限制,否则可用的描述符资源会耗尽。服务端测试代码为之前这篇博文中的Reactor模型的代码。
socket数量的限制(描述符数量的限制)两种修改方法:
-
ulimit -n
这种方法不能持久,重启后就得重新设置 -
vim /etc/security/limits.conf
limits.conf文件的格式如下:
- domain就是指用户;
- 限制策略的type分soft软和hard硬,所谓hard就是硬性限制,数量达到value这个上限就不分配了;soft相对宽松,达到上限后开始回收;
- item:对于文件描述符的数量限制,其对应的item就是 nofile - max number of open files
增加两行行:
* soft nofile 1048576 * hard nofile 1048576客户端测试代码
#include#include #include #include #include #include #include #include #include #include #include #include #define MAX_CONNECTION 340000 // 单个客户端建立34W个连接,3个客户端就有100W #define MAX_BUFSIZE 128 #define MAX_EPOLLSIZE (384*1024) #define MAX_PORT 100 // 最大端口数量 #define TIME_MS_USED(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) // 用于计算耗时 static int fdSetNonBlock(int fd) { int flags; flags = fcntl(fd, F_GETFL); if(flags < 0) return flags; flags |= O_NONBLOCK; if(fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } static int sockSetReuseAddr(int sockfd) { int reuse = 1; return setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); } int main(int argc, char *argv[]) { if(argc < 3) { printf("Usage: %s ip port", argv[0]); return 0; } const char* ip = argv[1]; int port = atoi(argv[2]); int connections = 0; // 建立连接的计数,用于统计 char buffer[MAX_BUFSIZE] = {0}; int i; int clientFinishTest = 0; int portOffset = 0; struct epoll_event events[MAX_EPOLLSIZE]; int epfd = epoll_create(1); struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip); struct timeval loopBegin, loopEnd; gettimeofday(&loopBegin, NULL); while(1) { struct epoll_event ev; int sockfd = 0; if(connections < MAX_CONNECTION) // 连接数量未达到最大值就一直新建连接并connect到服务端 { sockfd = socket(AF_INET, SOCK_STREAM, 0); if(sockfd < 0) { perror("socket"); goto errExit; } addr.sin_port = htons(port + portOffset); portOffset = (portOffset + 1) % MAX_PORT; // 均匀地使用这100个端口 if(connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { perror("connect"); goto errExit; } fdSetNonBlock(sockfd); sockSetReuseAddr(sockfd); // 发送数据送缓冲区 snprintf(buffer, MAX_BUFSIZE - 1, "Hello, this is client No.%d port %d ", connections, port + portOffset); send(sockfd, buffer, strlen(buffer), 0); ev.events = EPOLLIN | EPOLLOUT; ev.data.fd = sockfd; epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); // 当前连接添加到epoll中进行监视 connections++; // 连接数增加 } // 每增加1000个连接就执行一次 if(connections % 1000 == 0 || connections == MAX_CONNECTION) { if(!clientFinishTest) // 当前客户端的测试尚未结束 { gettimeofday(&loopEnd, NULL); int msPer1K = TIME_MS_USED(loopEnd, loopBegin); gettimeofday(&loopBegin, NULL); // 进入下一轮计数 printf("-- connections: %d, sockfd:%d, time_used:%dn", connections, sockfd, msPer1K); } int nready = epoll_wait(epfd, events, connections, 100); for(i = 0; i < nready; i++) { int clientfd = events[i].data.fd; if (events[i].events & EPOLLOUT) { sprintf(buffer, "data from %d", clientfd); send(sockfd, buffer, strlen(buffer), 0); } else if (events[i].events & EPOLLIN) { char rBuffer[MAX_BUFSIZE] = {0}; ssize_t length = recv(sockfd, rBuffer, MAX_BUFSIZE, 0); if (length > 0) { printf("# Recv from server:%sn", rBuffer); } else if (length == 0) { printf("# Disconnected. clientfd:%dn", clientfd); connections --; close(clientfd); epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &events[i]); } else { if (errno == EINTR || errno == EAGAIN) continue; printf(" Error clientfd:%d, errno:%dn", clientfd, errno); close(clientfd); epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &events[i]); } } else { printf(" clientfd:%d, unknown events:%dn", clientfd, events[i].events); close(clientfd); epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &events[i]); } } if(connections == MAX_ConNECTION && !clientFinishTest) { printf("# Client finished the test.n"); clientFinishTest = 1; } } usleep(1000); // 短暂休眠1ms再继续 } return 0; errExit: printf("error : %sn", strerror(errno)); return 0; }
此时,宏定义MAX_PORT设为1,服务端只开启一个端口。
同时,我们先限制了客户端建立的连接数量为34W,看看此时单个客户端对服务器是否能够达到这个量级。
此时服务端只有一个端口监听时,而客户端的可用端口是有限的,因此当端口数量达到一定值后(此处为2.8W)就会报Cannot assign requested address的错误。
改进代码提升测试并发量 1 单线程多端口同时监听为了进一步增加可建立的连接数量,在服务端起100个监听端口并将所有监听 socket 描述符丢到epoll中,客户端循环连接到这100个端口。为什么这么做呢?可以看下文关于socket五元组的分析。
修改代码:修改服务器端的代码如下:
#include#include #include #include #include #include #include #include #include #include #include #define MAX_PORT 100 // 建立的最大端口数 #define MAX_BUFFER_SIZE 1024 struct sockitem { int sockfd; int (*callback)(int fd, int events, void *arg); int epfd; char recvbuffer[MAX_BUFFER_SIZE]; // 接收缓冲 char sendbuffer[MAX_BUFFER_SIZE]; // 发送缓冲 int recvlength; // 接收缓冲区中的数据长度 int sendlength; // 发送缓冲区中的数据长度 }; #define MAX_EVENTS_NUM 512 struct reactor { int epfd; struct epoll_event events[MAX_EVENTS_NUM]; }; int recv_cb(int fd, int events, void *arg); int send_cb(int fd, int events, void *arg) { struct sockitem *si = arg; struct epoll_event ev; int clientfd = si->sockfd; int ret = send(clientfd, si->sendbuffer, si->sendlength, 0); si->callback = recv_cb; // 切回读的回调 ev.events = EPOLLIN; ev.data.ptr = si; epoll_ctl(si->epfd, EPOLL_CTL_MOD, si->sockfd, &ev); return ret; } int recv_cb(int fd, int events, void *arg) { struct sockitem *si = arg; struct epoll_event ev; int clientfd = si->sockfd; int ret = recv(clientfd, si->recvbuffer, MAX_BUFFER_SIZE, 0); if(ret <= 0) { if(ret < 0) { if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { // 被打断直接返回的情况 return ret; } } else { printf("# client disconnected...n"); } close(clientfd); ev.events = EPOLLIN; ev.data.ptr = si; epoll_ctl(si->epfd, EPOLL_CTL_DEL, clientfd, &ev); free(si); } else { //printf("# recv data from fd:%d : %s , len = %dn", clientfd, si->recvbuffer, ret); si->recvlength = ret; memcpy(si->sendbuffer, si->recvbuffer, si->recvlength); // 将接收到的数据拷贝的发送缓冲区 si->sendlength = si->recvlength; si->callback = send_cb; // 回调函数要切换成写回调 struct epoll_event ev; ev.events = EPOLLOUT | EPOLLET; // 写的时候最好还是用ET ev.data.ptr = si; epoll_ctl(si->epfd, EPOLL_CTL_MOD, si->sockfd, &ev); } return ret; } int accept_cb(int fd, int events, void *arg) { struct sockitem *si = arg; struct epoll_event ev; struct sockaddr_in client; memset(&client, 0, sizeof(struct sockaddr_in)); socklen_t caddr_len = sizeof(struct sockaddr_in); int clientfd = accept(si->sockfd, (struct sockaddr*)&client, &caddr_len); if(clientfd < 0) { printf("# accept errorn"); return clientfd; } char str[INET_ADDRSTRLEN] = {0}; printf("recv from %s:%dn", inet_ntop(AF_INET, &client.sin_addr, str, sizeof(str)), ntohs(client.sin_port)); struct sockitem *client_si = (struct sockitem*)malloc(sizeof(struct sockitem)); client_si->sockfd = clientfd; client_si->callback = recv_cb; // accept完的下一步就是接收客户端数据 client_si->epfd = si->epfd; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; ev.data.ptr = client_si; epoll_ctl(si->epfd, EPOLL_CTL_ADD, clientfd, &ev); // 把客户端socket增加到epoll中监听 return clientfd; } int init_port_and_listen(int port, int epfd) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) return -1; struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port = htons(port); if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) return -2; if(listen(sockfd, 5) < 0) return -3; struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem)); // 自定义数据,用于传递给回调函数 si->sockfd = sockfd; si->callback = accept_cb; si->epfd = epfd; // sockitem 中增加一个epfd成员以便回调函数中使用 struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN; // 默认LT ev.data.ptr = si; epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); // 添加到事件到epoll return sockfd; } int main(int argc, char* argv[]) { int port = atoi(argv[1]); int i; struct sockitem *si; struct reactor ra; ra.epfd = epoll_create(1); for(i = 0; i < MAX_PORT; i++) // 建立100个端口进行监听,并且加入epoll init_port_and_listen(port + i, ra.epfd); while(1) { int nready = epoll_wait(ra.epfd, ra.events, MAX_EVENTS_NUM, -1); if(nready < 0) { printf("epoll_wait error.n"); break; } int i; for(i = 0; i < nready; i++) { si = ra.events[i].data.ptr; if(ra.events[i].events & (EPOLLIN | EPOLLOUT)) { if(si->callback != NULL) si->callback(si->sockfd, ra.events[i].events, si); // 调用回调函数 } } } }
此时要记得将客户端程序的宏定义MAX_PORT也设为100。
测试现象:为了减小客户端的压力并缩短测试时间,这里起三个客户端同时向服务端发起连接,每个客户端最终建立34W个连接,最终总体会有大于100W个连接数。
长时间运行后,虽然不再出现之前的错误,但连接数量继续增大后客户端会出现connect连接超时错误。
分析:是客户端端口不够吗?注意,一个socket包含一个五元组,其中 src port 和 dst port 是组合出现的,所以对于客户端socket的数量限制不是在于端口号(65535)的限制,而是五元组数量的限制。现在服务端 dst port 有100个,那么它与客户端 src port 的组合也可以有更多(100×65535)。此时连接数还不算多,显然不是客户端端口不够引起的。
可以想象,这是客户端调用connect()进行三次握手时,服务端没有返回ACK。为什么没有返回呢?
这里涉及到了 iptables 的作用,实际上网络协议栈也会对连接的数量进行限制,超出限制就不接收连接了。
通过vim /etc/sysctrl.conf可以修改这个限制,将这个字段net.nf_conntrack_max = 1048576开放出来并设置一个最大值。此外,这个配置文件里边还有一个字段为fs.file-max = 1048576,这是指的 fd 的最大值(是值,非个数)。关于 sysctrl.conf 的更多内容可以参考:sysctl.conf文件详解
(修改后的最终测试结果后面有条件再补充)
实际的做法实际中会采用直接fork出多进程进行监听,好处在于每个进程都有各自fd限制数量,互相不影响,因此可以监听地更多。
光使用epoll,靠着Linux操作系统就能把并发量开到百万级别了。
2 增加多线程处理 现象:处理时间存在明显抖动不难发现,每运行一段时间后(每建立1000个连接),会周期性地出现某个处理时间段比之前几个时间段长很多的情况,为什么会出现这么严重的抖动?
原因:单线程处理的情况下,半连接、全连接队列满了之后,其他连接需要排队等待。当队列中的客户端堆积到最大值后,必须等线程从队列中accept出连接,才有更多的队列空间去装后面的客户端。如果用多线程去处理,接收连接就会更快,抖动就会减小。
处理方法:多线程accept· 将 accept 与 recv、send分开。
· 增加多个线程去accept相同的监听socket fd,实现多点接入。


![[C/C++后端开发学习] 9 服务端百万并发测试 [C/C++后端开发学习] 9 服务端百万并发测试](http://www.mshxw.com/aiimages/31/430020.png)
