栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

[C/C++后端开发学习] 9 服务端百万并发测试

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

[C/C++后端开发学习] 9 服务端百万并发测试

服务端百万并发测试
  • 服务端并发的概念
    • 常见单机服务模型
  • 并发测试方法
    • socket数量的限制(描述符数量的限制)
    • 客户端测试代码
    • 测试结果
  • 改进代码提升测试并发量
    • 1 单线程多端口同时监听
      • 修改代码:
      • 测试现象:
      • 分析:
      • 解决方法:
      • 实际的做法
    • 2 增加多线程处理
      • 现象:处理时间存在明显抖动
      • 原因:
      • 处理方法:多线程accept

服务端并发的概念

并发量:同时承载的客户端的数量,实质上就是同时能够维护的 socket 的数量

比如要实现10W客户端同时在线,200ms内正常返回结果,承载量的影响因素:

  • 数据库
  • 网络带宽
  • 内存操作
  • 日志

测试服务器十万、百万并发能力的意义更多地在于测试服务器是否具备这种能力,它并不是与业务直接相关的。

常见单机服务模型
  • 单线程同步:NTP
  • 多线程同步:Natty
  • 纯异步:Redis、HAProxy
  • 半同步半异步:Natty
  • 多进程同步:fastcgi
  • 多线程异步:memcached
  • 多进程异步:Nginx
  • 一个请求一个进程/线程:Apache/CGI
  • 微进程框架:erlang/go/lua
  • 协程框架:libco/ntyco
并发测试方法

实际上,我们之前基于epoll实现的Reactor模型的简单echo服务器就应经可以实现几十万量级的并发了,我们可以测试一下。测试之前需要先修改一下描述符数量的限制,否则可用的描述符资源会耗尽。服务端测试代码为之前这篇博文中的Reactor模型的代码。

socket数量的限制(描述符数量的限制)

两种修改方法:

  • ulimit -n
    这种方法不能持久,重启后就得重新设置

  • vim /etc/security/limits.conf
    limits.conf文件的格式如下:

    • domain就是指用户;
    • 限制策略的type分soft软和hard硬,所谓hard就是硬性限制,数量达到value这个上限就不分配了;soft相对宽松,达到上限后开始回收;
    • item:对于文件描述符的数量限制,其对应的item就是 nofile - max number of open files

    增加两行行:

*	soft	nofile	1048576
*	hard	nofile	1048576
客户端测试代码
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define MAX_CONNECTION  340000  // 单个客户端建立34W个连接,3个客户端就有100W
#define MAX_BUFSIZE		128
#define MAX_EPOLLSIZE	(384*1024)
#define MAX_PORT		100     // 最大端口数量

#define TIME_MS_USED(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)  // 用于计算耗时


static int fdSetNonBlock(int fd)
{
    int flags;

    flags = fcntl(fd, F_GETFL);
    if(flags < 0) return flags;

    flags |= O_NONBLOCK;
    if(fcntl(fd, F_SETFL, flags) < 0) return -1;

    return 0;
}


static int sockSetReuseAddr(int sockfd)
{
    int reuse = 1;
    return setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
}


int main(int argc, char *argv[])
{
    if(argc < 3)
    {
        printf("Usage: %s ip port", argv[0]);
        return 0;
    }

    const char* ip = argv[1];
    int port = atoi(argv[2]);
    int connections = 0;    // 建立连接的计数,用于统计
    char buffer[MAX_BUFSIZE] = {0};
    int i;
    int clientFinishTest = 0;
    int portOffset = 0;

    struct epoll_event events[MAX_EPOLLSIZE];
    int epfd = epoll_create(1);

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = inet_addr(ip);

    struct timeval loopBegin, loopEnd;
    gettimeofday(&loopBegin, NULL);

    while(1)
    {
        struct epoll_event ev;
		int sockfd = 0;

        if(connections < MAX_CONNECTION)    // 连接数量未达到最大值就一直新建连接并connect到服务端
        {
            sockfd = socket(AF_INET, SOCK_STREAM, 0);
            if(sockfd < 0)
            {
                perror("socket");
                goto errExit;
            }

            addr.sin_port = htons(port + portOffset);
            portOffset = (portOffset + 1) % MAX_PORT;   // 均匀地使用这100个端口

            if(connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
            {
                perror("connect");
                goto errExit;
            }

            fdSetNonBlock(sockfd);
            sockSetReuseAddr(sockfd);

            // 发送数据送缓冲区
            snprintf(buffer, MAX_BUFSIZE - 1, "Hello, this is client No.%d port %d ", connections, port + portOffset);
            send(sockfd, buffer, strlen(buffer), 0);

            ev.events = EPOLLIN | EPOLLOUT;
            ev.data.fd = sockfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);    // 当前连接添加到epoll中进行监视

            connections++; // 连接数增加
        }

        // 每增加1000个连接就执行一次
        if(connections % 1000 == 0 || connections == MAX_CONNECTION)
        {
            if(!clientFinishTest)   // 当前客户端的测试尚未结束
            {
                
                gettimeofday(&loopEnd, NULL);
                int msPer1K = TIME_MS_USED(loopEnd, loopBegin);
                gettimeofday(&loopBegin, NULL); // 进入下一轮计数

                printf("-- connections: %d, sockfd:%d, time_used:%dn", connections, sockfd, msPer1K);
            }
            int nready = epoll_wait(epfd, events, connections, 100);
            for(i = 0; i < nready; i++)
            {
                int clientfd = events[i].data.fd;
                if (events[i].events & EPOLLOUT)
                {
					sprintf(buffer, "data from %d", clientfd);
					send(sockfd, buffer, strlen(buffer), 0);
				}
                else if (events[i].events & EPOLLIN) 
                {
					char rBuffer[MAX_BUFSIZE] = {0};				
					ssize_t length = recv(sockfd, rBuffer, MAX_BUFSIZE, 0);
					if (length > 0) 
                    {
						printf("# Recv from server:%sn", rBuffer);
					} 
                    else if (length == 0) 
                    {
						printf("# Disconnected. clientfd:%dn", clientfd);
						connections --;
						close(clientfd);
                        epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &events[i]);
					} 
                    else 
                    {
						if (errno == EINTR || errno == EAGAIN) continue;

						printf(" Error clientfd:%d, errno:%dn", clientfd, errno);
						close(clientfd);
                        epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &events[i]);
					}
				} 
                else 
                {
					printf(" clientfd:%d, unknown events:%dn", clientfd, events[i].events);
					close(clientfd);
                    epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &events[i]);
				}
            }

            if(connections == MAX_ConNECTION && !clientFinishTest)
            {
                printf("# Client finished the test.n");
                clientFinishTest = 1;
            }
        }

        usleep(1000);   // 短暂休眠1ms再继续
    }

    return 0;

errExit:
	printf("error : %sn", strerror(errno));
	return 0;
}

此时,宏定义MAX_PORT设为1,服务端只开启一个端口。
同时,我们先限制了客户端建立的连接数量为34W,看看此时单个客户端对服务器是否能够达到这个量级。

测试结果

此时服务端只有一个端口监听时,而客户端的可用端口是有限的,因此当端口数量达到一定值后(此处为2.8W)就会报Cannot assign requested address的错误。

改进代码提升测试并发量 1 单线程多端口同时监听

为了进一步增加可建立的连接数量,在服务端起100个监听端口并将所有监听 socket 描述符丢到epoll中,客户端循环连接到这100个端口。为什么这么做呢?可以看下文关于socket五元组的分析。

修改代码:

修改服务器端的代码如下:

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define MAX_PORT		100		// 建立的最大端口数

#define MAX_BUFFER_SIZE 1024
struct sockitem
{
	int sockfd;
	int (*callback)(int fd, int events, void *arg);
    int epfd;

    char recvbuffer[MAX_BUFFER_SIZE]; // 接收缓冲
	char sendbuffer[MAX_BUFFER_SIZE]; // 发送缓冲
    int recvlength; // 接收缓冲区中的数据长度
    int sendlength; // 发送缓冲区中的数据长度
};

#define MAX_EVENTS_NUM 512

struct reactor
{
    int epfd;
    struct epoll_event events[MAX_EVENTS_NUM];
};

int recv_cb(int fd, int events, void *arg);


int send_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    int clientfd = si->sockfd;
    
    
    int ret = send(clientfd, si->sendbuffer, si->sendlength, 0);

    si->callback = recv_cb; // 切回读的回调
    ev.events = EPOLLIN;
    ev.data.ptr = si;
    epoll_ctl(si->epfd, EPOLL_CTL_MOD, si->sockfd, &ev);

    return ret;
}


int recv_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    int clientfd = si->sockfd;
    int ret = recv(clientfd, si->recvbuffer, MAX_BUFFER_SIZE, 0);
    if(ret <= 0)
    {
        if(ret < 0)
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
            {   // 被打断直接返回的情况
                return ret;
            }
        }
        else
        {
            printf("# client disconnected...n");
        }
        
        
        close(clientfd);
        ev.events = EPOLLIN;
        ev.data.ptr = si;
        epoll_ctl(si->epfd, EPOLL_CTL_DEL, clientfd, &ev);  
        free(si);
    }
    else
    {
        //printf("# recv data from fd:%d : %s , len = %dn", clientfd, si->recvbuffer, ret);
        si->recvlength = ret;
        memcpy(si->sendbuffer, si->recvbuffer, si->recvlength); // 将接收到的数据拷贝的发送缓冲区
        si->sendlength = si->recvlength;

        si->callback = send_cb;     // 回调函数要切换成写回调
        struct epoll_event ev;
        ev.events = EPOLLOUT | EPOLLET; // 写的时候最好还是用ET
        ev.data.ptr = si;
        epoll_ctl(si->epfd, EPOLL_CTL_MOD, si->sockfd, &ev);
    }

    return ret;
}


int accept_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    struct sockaddr_in client;
    memset(&client, 0, sizeof(struct sockaddr_in));
    socklen_t caddr_len = sizeof(struct sockaddr_in);

    int clientfd = accept(si->sockfd, (struct sockaddr*)&client, &caddr_len);
    if(clientfd < 0)
    {
        printf("# accept errorn");
        return clientfd;
    }

    char str[INET_ADDRSTRLEN] = {0};
    printf("recv from %s:%dn", inet_ntop(AF_INET, &client.sin_addr, str, sizeof(str)),
        ntohs(client.sin_port));

    struct sockitem *client_si = (struct sockitem*)malloc(sizeof(struct sockitem));
    client_si->sockfd = clientfd;
    client_si->callback = recv_cb;  // accept完的下一步就是接收客户端数据
    client_si->epfd = si->epfd;

    memset(&ev, 0, sizeof(struct epoll_event));
    ev.events = EPOLLIN;
    ev.data.ptr = client_si;
    epoll_ctl(si->epfd, EPOLL_CTL_ADD, clientfd, &ev);  // 把客户端socket增加到epoll中监听
    return clientfd;
}

int init_port_and_listen(int port, int epfd)
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
		return -1;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(port);

    if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
        return -2;
    if(listen(sockfd, 5) < 0)
        return -3;

    
    struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));    // 自定义数据,用于传递给回调函数
    si->sockfd = sockfd;
    si->callback = accept_cb;
    si->epfd = epfd; // sockitem 中增加一个epfd成员以便回调函数中使用

    struct epoll_event ev;
    memset(&ev, 0, sizeof(struct epoll_event));
    ev.events = EPOLLIN;    // 默认LT
    ev.data.ptr = si;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);    // 添加到事件到epoll
    return sockfd;
}

int main(int argc, char* argv[])
{
    int port = atoi(argv[1]);
    int i;
    struct sockitem *si;
    
    struct reactor ra;
    ra.epfd = epoll_create(1);

    for(i = 0; i < MAX_PORT; i++)   // 建立100个端口进行监听,并且加入epoll
        init_port_and_listen(port + i, ra.epfd);

    while(1)
    {
        int nready = epoll_wait(ra.epfd, ra.events, MAX_EVENTS_NUM, -1);
        if(nready < 0)
        {
            printf("epoll_wait error.n");
            break;
        }

        int i;
        for(i = 0; i < nready; i++)
        {
            si = ra.events[i].data.ptr;
            if(ra.events[i].events & (EPOLLIN | EPOLLOUT))
            {
                if(si->callback != NULL)
                    si->callback(si->sockfd, ra.events[i].events, si);  // 调用回调函数
            }
        }
    }
}

此时要记得将客户端程序的宏定义MAX_PORT也设为100。

测试现象:

为了减小客户端的压力并缩短测试时间,这里起三个客户端同时向服务端发起连接,每个客户端最终建立34W个连接,最终总体会有大于100W个连接数。

长时间运行后,虽然不再出现之前的错误,但连接数量继续增大后客户端会出现connect连接超时错误。

分析:

是客户端端口不够吗?注意,一个socket包含一个五元组,其中 src port 和 dst port 是组合出现的,所以对于客户端socket的数量限制不是在于端口号(65535)的限制,而是五元组数量的限制。现在服务端 dst port 有100个,那么它与客户端 src port 的组合也可以有更多(100×65535)。此时连接数还不算多,显然不是客户端端口不够引起的。

可以想象,这是客户端调用connect()进行三次握手时,服务端没有返回ACK。为什么没有返回呢?
这里涉及到了 iptables 的作用,实际上网络协议栈也会对连接的数量进行限制,超出限制就不接收连接了。

解决方法:

通过vim /etc/sysctrl.conf可以修改这个限制,将这个字段net.nf_conntrack_max = 1048576开放出来并设置一个最大值。此外,这个配置文件里边还有一个字段为fs.file-max = 1048576,这是指的 fd 的最大值(是值,非个数)。关于 sysctrl.conf 的更多内容可以参考:sysctl.conf文件详解

(修改后的最终测试结果后面有条件再补充)

实际的做法

实际中会采用直接fork出多进程进行监听,好处在于每个进程都有各自fd限制数量,互相不影响,因此可以监听地更多。

光使用epoll,靠着Linux操作系统就能把并发量开到百万级别了。

2 增加多线程处理 现象:处理时间存在明显抖动

不难发现,每运行一段时间后(每建立1000个连接),会周期性地出现某个处理时间段比之前几个时间段长很多的情况,为什么会出现这么严重的抖动?

原因:

单线程处理的情况下,半连接、全连接队列满了之后,其他连接需要排队等待。当队列中的客户端堆积到最大值后,必须等线程从队列中accept出连接,才有更多的队列空间去装后面的客户端。如果用多线程去处理,接收连接就会更快,抖动就会减小。

处理方法:多线程accept

· 将 accept 与 recv、send分开。
· 增加多个线程去accept相同的监听socket fd,实现多点接入。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/430020.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号