栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

[C/C++后端开发学习] 7 tcp服务器的epoll实现以及Reactor模型

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

[C/C++后端开发学习] 7 tcp服务器的epoll实现以及Reactor模型

tcp服务器的epoll实现以及Reactor模型
  • 1 IO多路复用
    • select
    • poll
    • epoll
  • 2 epoll详解
    • 2.1 基本使用方法
    • 2.2 LT水平触发和ET边沿触发
    • 2.3 实现服务器监听和接收数据
  • 3 Reactor模型
    • 3.1 增加回调函数实现Reactor模型
    • 3.2 实现服务器发送数据
    • 3.3 半包的数据怎么处理?
    • 3.4 多线程处理
      • 处理方法:
    • 3.5 多进程处理
      • 处理方法:
    • 3.6 Reactor参考案例
  • 其他杂项笔记

1 IO多路复用

IO多路复用简单地理解就是,一个线程(或进程)同时负责多个文件描述符fd的读写操作,它通过某种方式对这些fd进行监听,当内核发现指定的某个描述符就绪时,就通知线程进行读写。

select、poll和epoll是最常见的IO多路复用实现方式。基于此实现的服务器模型往往又被称为事件驱动模型。

select

select将需要监听的接口描述符标记到集合中(数组实现),包括读、写、异常3个集合,然后将这些集合传递给内核;内核轮询遍历集合中标记的描述符,并将就绪的描述符保留在这些集合中返回。应用程序从内核处拷贝回这些集合后,还需要再遍历一遍才能获知哪些描述符处于就绪的状态。

当描述符较大时,select()接口本身需要消耗大量时间去轮询各个句柄;其次,整个描述符集合在用户程序和内核之间来回拷贝也存在一定的开销;最后,应用程序还需要遍历一遍描述符集合才能获得各描述符的状态。可见,select 显然不是实现“事件驱动”的最好选择。

下面是select 接口的原型定义:

FD_ZERO(int fd, fd_set* fds)
FD_SET(int fd, fd_set* fds)
FD_ISSET(int fd, fd_set* fds)
FD_CLR(int fd, fd_set* fds)
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout)

其中 fd_set 类型按 bit 位标记句柄,比如设置了一个值为16的句柄(通过FD_SET() ),则fd_set 的第16位bit被设置为1,select调用将对其进行监视;select调用返回后,也需要检查fd_set 中的16号句柄(第16位bit)是否被标记为1(通过FD_ISSET() ),从而可判断对应的描述符是否可读写。

客户端的一个 connect() 操作,将在服务器端的监听socket激发一个“可读事件”;close()操作也是对相应连接的socket触发可读,只不过读操作返回的是0。

poll

poll的工作方式与select本质上差不多,只不过将内部管理描述符的数据结构由数组改为了链表,取消了最大可监控文件描述符数的限制,其他方面与select基本无异。

epoll

epoll 在内核中维护一个简易的文件系统,其中包含一个红黑树和一个就绪队列。每一个事件结构体都包含需要监听的描述符、监听的读写事件以及事件触发时的回调函数,这些事件会被挂到红黑树中,这样可以实现事件结构的快速查询,方便实现事件的修改和删除;就绪的描述符会被加入到等待队列中,因此应用程序不需要轮询,直接从队列中读走就绪的事件结构即可。

关于epoll的工作原理可以参考:Select、Poll、Epoll详解

与select相对地,1)epoll只需要将待监视的描述符信息拷贝一次给内核,不必每次调用前都设置一次;2)不必每次都遍历描述符集合,直接读取就绪的描述符事件结构即可;3)最大描述符数量没有限制。

不过,select是POSIX的,而不同的操作系统特供的 epoll 接口差异很大,因此跨平台能力就相对差一些,不过对于以Linux为主的服务器环境问题不大。

如果处理的连接数不是特别多的话,使用select/epoll 方案的服务器不一定就比使用多线程+阻塞IO 方案的性能更好,甚至可能延迟还更大。但对于连接数更多的情况下,select/epoll 等肯定具有优势。

2 epoll详解 2.1 基本使用方法
  • 创建一个epoll实例,返回该实例的描述符epfd:
int epoll_create(int size);		// size参数只有0和1的区别(Since Linux 2.6.8, the argument is ignored, but must be  greater  than  zero)
  • 设置epoll事件结构(注意了哈,epoll_data 是个**union**!)。
typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

struct epoll_event
{
  uint32_t events;	
  epoll_data_t data;	
} __EPOLL_PACKED;

其中,events 变量常用 EPOLLIN、EPOLLOUT、EPOLLET,可以通过‘“或”运算符|进行组合。fd 和 ptr 根据需要设置其一,一般使用 ptr 指向自定义的结构体,其中包含了fd 和回调函数,实例如下:

struct sockitem
{
	int sockfd;
	int (*callback)(int fd, int events, void *arg);
};
  • 添加事件到epoll实例中:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epfd : 就是epoll实例的描述符
op : EPOLL_CTL_ADD(增加事件)、EPOLL_CTL_MOD(修改事件)、EPOLL_CTL_DEL(删除事件)
event : 自然就是对应的事件结构体了
  • 获取就绪的触发事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

此处,events是一个输入/输出参数,传入的应该要是一个struct epoll_event数组;maxevents一般是数组大小;timeout是以毫秒为单位的超时事件,传入-1则永久阻塞,传入0则会立即返回;Linux的man中说的很清楚:

The call will block until either :
* a file descriptor delivers an event;
* the call is interrupted by a signal handler; or
* the timeout expires.

正常返回时,返回值为就绪的事件数目n,events数组中的前n个元素则存放了就绪的事件结构体。

2.2 LT水平触发和ET边沿触发

LT水平触发是 epoll 的默认触发方式,与select一样,读时只要 fd 对应的缓冲区中有数据,就一直触发通知应用程序 fd 可读,写时只要缓冲区中有空间就通知应用 fd 可写。而ET边沿触发是epoll特有的,读时只有当缓冲区中加入新的数据时,才触发一次,之后如果数据没有全部读走,及时缓冲区中还有数据,也不会再触发,直至缓冲区又送入了新数据才又触发一次。

要使用边沿触发的话,在epoll_event结构的变量events中设置EPOLLET即可。

一般的使用方法为:

  • ET + 循环读出全部数据
  • LT + 一次性读一大块数据

使用ET时一次触发往往需要通过循环去将所有数据读出,否则有可能导致死等,即:客户端发送数据后等待响应,但一次触发后服务端没有读出全部数据,不会响应客户端,客户端也不再发数据引起触发,于是二者就僵持住了。

大块数据不建议用ET+循环去读写,否则大量时间都会花费在某个大块数据的IO读上而无法处理其他IO。

而监听的 socket 适合用LT来处理,因为只要监听的队列中还有客户端连接请求就应马上处理。

2.3 实现服务器监听和接收数据
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

int main(int argc, char* argv[])
{
    int port = atoi(argv[1]);   // 传入一个参数作为端口号,此处对参数简单处理

    
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
		return -1;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(port);

    if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
        return -2;

    if(listen(sockfd, 5) < 0)
        return -3;
    

    
    int epfd = epoll_create(1);

    struct epoll_event ev;
    struct epoll_event events[512]; // wait时存储就绪的事件

    memset(&ev, 0, sizeof(struct epoll_event));
    ev.events = EPOLLIN;    // 默认LT
    ev.data.fd = sockfd;

    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);    // 添加到事件到epoll

    while(1)
    {
        int nready = epoll_wait(epfd, events, 512, -1); // 阻塞wait
        if(nready < 0)
        {
            printf("epoll_wait error.n");
            break;
        }

        int i;
        for(i = 0; i < nready; i++)	// 遍历处理就绪的事件
        {
            if(events[i].events & EPOLLIN)  // 客户端connect是可读事件 EPOLLIN; 客户端关闭连接,也是 EPOLLIN,只是读时返回0
            {
                if(events[i].data.fd == sockfd) // 如果是监听套接字的事件(对应需要调用accept)
                {
                    struct sockaddr_in client;
                    memset(&client, 0, sizeof(struct sockaddr_in));
                    socklen_t caddr_len = sizeof(struct sockaddr_in);

                    int clientfd = accept(sockfd, (struct sockaddr*)&client, &caddr_len);
                    if(clientfd < 0)
                    {
                        printf("# accept errorn");
                        continue;
                    }

                    char str[INET_ADDRSTRLEN] = {0};
                    printf("recv from %s at port %dn", inet_ntop(AF_INET, &client.sin_addr, str, sizeof(str)),
                        ntohs(client.sin_port));

                    memset(&ev, 0, sizeof(struct epoll_event));
                    ev.events = EPOLLIN;
                    ev.data.fd = clientfd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);  // 把客户端socket增加到epoll中监听
                }
                else // 如果是非监听套接字的其他已经连接的客户端的事件
                {
                    int clientfd = events[i].data.fd;
                    char buffer[1024] = {0};
                    int ret = recv(clientfd, buffer, 1024, 0);
                    if(ret <= 0)
                    {
                        if(ret < 0)
                        {
                            if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
                            {   // 被打断直接返回的情况
                                continue;
                            }
                        }
                        else
                        {   // 返回0说明对端关闭连接
                            printf("# client disconnected...n");
                        }
                        
                        
                        close(clientfd);
                        ev.events = EPOLLIN;
                        ev.data.fd = clientfd;
                        epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev);  
                    }
                    else
                    {
                        printf("# recv data from fd:%d : %s , len = %dn", clientfd, buffer, ret);
                    }
                }
            }

            if(events[i].events & EPOLLOUT)
            {
                // TODO
            }

        }
    }
}
3 Reactor模型

Reactor 的原义为“反应堆”,是一种事件驱动机制。其特殊之处在于,应有程序并非直接调用某个API进行IO处理,而是将IO处理对应的事件处理函数注册到 Reactor 上,当事件触发时,由 Reactor 去调用事件处理函数进行IO处理。这些事件处理函数就是所谓的“回调函数”。

Reactor 模型有三个重要的组件:
- 多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
- 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。
- 事件处理器:负责处理特定事件的处理函数。

Reactor 模型通常是单线程的,其设计目标是单线程使用一颗 CPU 的全部资源,这样做的附带好处在于:不必考虑共享资源的互斥访问。

同时它也具有一定的可扩展性,对于多核的机器,可以增加 Reactor 实例个数,使每个核上运行一个反应堆。当然这些Reactor 实例处理的请求也应是互不相关的,这适用于一些需要为简单的访问提供并发服务的场景。例如 Nginx 这样的 http 静态服务器。

几乎所有的Linux服务器低层都是采用基于epoll的Reactor模型实现的。

用一句话总结Reactor:由epoll等最基础的对IO进行管理的思路转换为对事件进行管理的思路。

3.1 增加回调函数实现Reactor模型
struct sockitem
{
	int sockfd;
	int (*callback)(int fd, int events, void *arg);
    int epfd;
};

#define MAX_EVENTS_NUM 512

struct reactor
{
    int epfd;
    struct epoll_event events[MAX_EVENTS_NUM];
};


int send_cb(int fd, int events, void *arg)
{
	
    return 0;
}


int recv_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    int clientfd = si->sockfd;
    char buffer[1024] = {0};
    int ret = recv(clientfd, buffer, 1024, 0);
    if(ret <= 0)
    {
        if(ret < 0)
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
            {   // 被打断直接返回的情况
                return ret;
            }
        }
        else
        {
            printf("# client disconnected...n");
        }
        
        
        close(clientfd);
        ev.events = EPOLLIN;
        ev.data.ptr = si;
        epoll_ctl(si->epfd, EPOLL_CTL_DEL, clientfd, &ev);  
        free(si);
    }
    else
    {
        printf("# recv data from fd:%d : %s , len = %dn", clientfd, buffer, ret);
    }

    return ret;
}


int accept_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    struct sockaddr_in client;
    memset(&client, 0, sizeof(struct sockaddr_in));
    socklen_t caddr_len = sizeof(struct sockaddr_in);

    int clientfd = accept(si->sockfd, (struct sockaddr*)&client, &caddr_len);
    if(clientfd < 0)
    {
        printf("# accept errorn");
        return clientfd;
    }

    char str[INET_ADDRSTRLEN] = {0};
    printf("recv from %s at port %dn", inet_ntop(AF_INET, &client.sin_addr, str, sizeof(str)),
        ntohs(client.sin_port));

    struct sockitem *client_si = (struct sockitem*)malloc(sizeof(struct sockitem));
    client_si->sockfd = clientfd;
    client_si->callback = recv_cb;  // accept完的下一步就是接收客户端数据
    client_si->epfd = si->epfd;

    memset(&ev, 0, sizeof(struct epoll_event));
    ev.events = EPOLLIN;
    ev.data.ptr = client_si;
    epoll_ctl(si->epfd, EPOLL_CTL_ADD, clientfd, &ev);  // 把客户端socket增加到epoll中监听

    return clientfd;
}

int main(int argc, char* argv[])
{
    int port = atoi(argv[1]);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
		return -1;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(port);

    if(bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
        return -2;

    if(listen(sockfd, 5) < 0)
        return -3;

    
    struct reactor ra;
    ra.epfd = epoll_create(1);

    struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));    // 自定义数据,用于传递给回调函数
    si->sockfd = sockfd;
    si->callback = accept_cb;
    si->epfd = ra.epfd; // sockitem 中增加一个epfd成员以便回调函数中使用

    struct epoll_event ev;
    memset(&ev, 0, sizeof(struct epoll_event));
    ev.events = EPOLLIN;    // 默认LT
    ev.data.ptr = si;

    epoll_ctl(ra.epfd, EPOLL_CTL_ADD, sockfd, &ev);    // 添加到事件到epoll

    while(1)
    {
        int nready = epoll_wait(ra.epfd, ra.events, MAX_EVENTS_NUM, -1);
        if(nready < 0)
        {
            printf("epoll_wait error.n");
            break;
        }

        int i;
        for(i = 0; i < nready; i++)
        {
            si = ra.events[i].data.ptr;
            if(ra.events[i].events & (EPOLLIN | EPOLLOUT))
            {
                if(si->callback != NULL)
                    si->callback(si->sockfd, ra.events[i].events, si);  // 调用回调函数
            }
        }
    }
}

上面的代码中,定义了一个sockitem结构体来作为event结构内的自定义数据,以便于在回调时区分不同的事件并方便在回调函数中使用事件的参数如socket 描述符等。此时还没有实现 send 操作的回调。

3.2 实现服务器发送数据

接收到客户端数据后,最简单的响应方式就是在接收回调函数recv_cb的最后调用send。但这么处理存在问题:
发送大量数据时,socket的发送缓冲区有可能是满的,此时send会返回-1。那我们可以增加循环去发送吗?不能,否则线程可能长时间停留在此处理该send过程,其他IO请求都没法处理了。

正确的处理方式:recv完之后,将socket在epoll中的可触发事件设为可写,在写数据的回调中send完数据后,再将其设为可读。如此循环运行下去。

修改读和写的回调函数如下:

int recv_cb(int fd, int events, void *arg);


int send_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    int clientfd = si->sockfd;
    
    
    char* hello = "Hello!n";
    int ret = send(clientfd, hello, strlen(hello), 0);

    si->callback = recv_cb; // 切回读的回调
    ev.events = EPOLLIN;
    ev.data.ptr = si;
    epoll_ctl(si->epfd, EPOLL_CTL_MOD, si->sockfd, &ev);

    return ret;
}


int recv_cb(int fd, int events, void *arg)
{
    struct sockitem *si = arg;
    struct epoll_event ev;

    int clientfd = si->sockfd;
    char buffer[1024] = {0};
    int ret = recv(clientfd, buffer, 1024, 0);
    if(ret <= 0)
    {
        if(ret < 0)
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
            {   // 被打断直接返回的情况
                return ret;
            }
        }
        else
        {
            printf("# client disconnected...n");
        }
        
        
        close(clientfd);
        ev.events = EPOLLIN;
        ev.data.ptr = si;
        epoll_ctl(si->epfd, EPOLL_CTL_DEL, clientfd, &ev);  
        free(si);
    }
    else
    {
        printf("# recv data from fd:%d : %s , len = %dn", clientfd, buffer, ret);
        
        

        si->callback = send_cb;     // 回调函数要切换成写回调
        struct epoll_event ev;
        ev.events = EPOLLOUT | EPOLLET; // 写的时候最好还是用ET
        ev.data.ptr = si;
        epoll_ctl(si->epfd, EPOLL_CTL_MOD, si->sockfd, &ev);
    }

    return ret;
}

3.3 半包的数据怎么处理?

TCP半包问题:数据量很大,一次回调读取或写入时没有处理完,希望下次回调时继续处理。

将各个socket的接收缓冲和发送缓冲都封装到各自的 sockitem 中,这样就能各自读写各的了。

#define MAX_BUFFER_SIZE 1024
struct sockitem
{
	int sockfd;
	int (*callback)(int fd, int events, void *arg);
    int epfd;

    char recvbuffer[MAX_BUFFER_SIZE]; // 接收缓冲
	char sendbuffer[MAX_BUFFER_SIZE]; // 发送缓冲
    int recvlength; // 接收缓冲区中的数据长度
    int sendlength; // 发送缓冲区中的数据长度
};
3.4 多线程处理

问题:当大量客户端同时发起 connect 时,有多少个客户端接入,epoll_wait 就要返回多少次(因为一次只能accept一个客户端),这将导致接入过程影响到其他IO请求的处理。

处理方法:

一个线程专门用于处理 accept,将返回的socket描述符注册到epoll实例中;另一个线程处理同一个epoll实例中的读写触发事件。

缺点:数据共享就避免不了要通过加锁来处理线程安全问题。

3.5 多进程处理

前提:解决好客户端关联数据(session)的存储和共享问题。因为多进程间并不直接共享内存资源,使用共享内存的方式会很麻烦。

处理方法:

多进程如何监听同一个端口?—— 执行完 listen 之后再fork。

3.6 Reactor参考案例

单线程Reactor——libevent、redis
多线程Reactor——memcached
多进程Reactor——nginx

其他杂项笔记

udp能否实现并发服务器?
udp无法保证数据达到的顺序。所以不能简单地采用在数据包上增加一层协议头的方式来区分客户端。
正确的做法:模拟tcp三次握手,握手后为每个客户端分配一个fd

socket是什么?
socket由两部分组成,1是fd,2是五元组–sip,dip,sport,dport,传输层协议(tcp/udp)

关于sigio

  • sigio允许为描述符安装一个信号处理函数,操作该描述符的进程发起读写操作后可以继续运行而不阻塞;等到数据准备好后,内核会给进程发送一个SIGIO信号,进程通过异步回调的方式进入到信号处理函数去处理数据。

  • sigio 很难看到在实际中应用。

  • udp 实现并发服务器可以使用sigio来接收(异步回调函数中调用recvfrom),但tcp不行,因为tcp接收流数据时会产生大量sigio,包括accept、recv都会产生sigio,但信号处理函数却没有传入 fd 因而无法区分。

信号是如何发送的?

  1. 进程的信号集合保存在task_struct 中
    task_struct -> sighand_struct (Sched.h)-> sigaction–>action[64] , SIGIO是29

  2. 调用signal时信号如何绑定到进程
    系统调用 SYSCALL_DEFINE2(signal, …) -> do_sigaction() -> 设置该进程的sighand->action[sig-1]

  3. 信号如何发送
    kill 系统调用发送信号 -> signalfd_notify() -> 激活等待队列(条件等待,等待sigio位满足) --> 回调

服务端出现大量close_wait是什么情况?
客户端关闭连接后,服务端没有及时关闭socket

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/352990.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号