栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

【Linux】基于Reactor模式的epoll ET简易服务器

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Linux】基于Reactor模式的epoll ET简易服务器

文章目录
    • 什么是Reactor
    • 服务器实现
      • server.cc
      • reactor.hpp
      • appInterface.hpp
      • util.hpp

什么是Reactor

Reactor 释义“反应堆”,是一种事件驱动机制

和普通函数调用的不同之处在于:

  • 应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor上, 如果相应的事件发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”
  • Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上
  • 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中
  • Reactor架构模式允许事件驱动的应用通过多路分发的机制去处理来自不同客户端的多个请求
  • Reactor可以理解为I/O管理集合,和多线程、多进程没有关系。但后续可以额外在reactor中引入多线程

转载自https://blog.csdn.net/qq_21539375/article/details/123561321

服务器实现

除了listen_sock用accepter处理,其他clientfd用recver和senerd处理

以前是通过epoll对I/O进行管理,而现在通过reactor对事件进行管理,可以把reactor理解为一种设计模式

server.cc
#include "reactor.hpp"
#include "socket.hpp"
#include "appInterface.hpp"
#include "util.hpp"
#include 
static void Usage(std::string proc) {
    std::cerr << "Usage :" << "nt" << proc << " port " << std::endl;
}

int main(int argc, char* argv[]) {
    if (argc != 2) {
        Usage(argv[0]);
        exit(4);
    }

    //套接字相关
    int port = atoi(argv[1]);
    int listen_sock = ns_sock::Sock::Socket();

    //设置listen_sock 为非阻塞状态
    ns_until::SetNoBlock(listen_sock);

    ns_sock::Sock::Bind(listen_sock, port);
    ns_sock::Sock::Listen(listen_sock, BACKLOG);

    //事件管理器相关
    ns_epoll::Reactor reactor;
    reactor.InitReactor();

    ns_epoll::EventItem item;
    item.sock = listen_sock;
    item.R = &reactor;



    item.ManagerCallBack(ns_appinterface::accepter, nullptr, nullptr);

    //将listen_sock托管给reactor//使用ET模式
    reactor.AddEvent(listen_sock, EPOLLIN|EPOLLET, item);

    int timeout = 1000;
    while (true) {
        reactor.Distatch(timeout);
    }
}

reactor.hpp
#pragma once
#include "socket.hpp"

#include 
#include 
#include 


#define BACKLOG 5
#define SIZE 256
#define MAXNUM 64

namespace ns_epoll {

    class Reactor; 
    class EventItem;

    typedef int(*callback_t)(EventItem*);

    //当成结构体使用
    class EventItem {
    public:
        //与通信相关
        int sock;

        //回指Reactor
        Reactor* R;

        //有关数据处理的回调函数,用来进行逻辑解耦!  应用数据就绪等通信细节和数据的处理模块使用该方法进行解耦!
        callback_t recv_handler;
        callback_t send_handler;
        callback_t error_handler; 

        std::string inbuffer; //读取到的数据缓冲区
        std::string outbuffer;//待发送的数据缓冲区
    public:
        EventItem() :sock(0), R(nullptr), recv_handler(nullptr), send_handler(nullptr), error_handler(nullptr) {}
        
        //管理回调
        void ManagerCallBack(callback_t _recv,callback_t _send,callback_t _err) {
            recv_handler = _recv;
            send_handler = _send;
            error_handler = _err;
        }

        ~EventItem() {
            //TODO
        }

    };

    class Reactor {
    private:
        int epfd;

        std::unordered_map event_item; //sock:EventItem

    public:
        Reactor() :epfd(-1) {}

    public:
        void InitReactor() {
            
            //创建epoll模型
            if ((epfd = epoll_create(SIZE)) < 0) {
                std::cerr << "epoll_creat error!!" << std::endl;
                exit(4);
            }
            std::cout << "server start" << std::endl;
        }


        //事件分派器
        void Distatch(int timeout) {
            //如果底层事件就绪,就把对应的事件分派给指定的回调函数进行统一处理
            
            struct epoll_event revs[MAXNUM];

            //返回值num表示就绪的事件数,内核会将就绪事件依次放入revs中
            int num = epoll_wait(epfd, revs, MAXNUM, timeout);

            for (int i = 0; i < num; ++i) {
                int sock = revs[i].data.fd;
                uint32_t mask = revs[i].events;

                
                //把所有的异常事件统一交给read,write处理
                //if ((revs[i].events & EPOLLERR || revs[i].events & EPOLLHUP)) mask | (EPOLLIN | EPOLLOUT);
                //异常事件发生 或者对端关闭
                if (revs[i].events & EPOLLERR || revs[i].events & EPOLLHUP) {
                    //如果error_handler被设置,就直接调用
                    if (event_item[sock].error_handler) event_item[sock].error_handler(&event_item[sock]);
                }
                //读事件发生
                if (revs[i].events & EPOLLIN) {
                    //如果recv_handler被设置,就直接调用 
                    if (event_item[sock].recv_handler) event_item[sock].recv_handler(&event_item[sock]);
                }
                //写事件发生
                if (revs[i].events & EPOLLOUT) {
                    //如果send_handler被设置,就直接调用
                    if (event_item[sock].send_handler) event_item[sock].send_handler(&event_item[sock]);
                }   

            }
        }

        bool AddEvent(int sock, int event, const EventItem& item) {
            struct epoll_event ev;
            ev.events = 0;
            ev.events |= event;
            ev.data.fd = sock;
            if (epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev) < 0) {
                std::cerr << "epoll_ctr error  fd:" << sock << std::endl;
                return false;
            }

            event_item.insert({ sock,item });

            std::cout << "debug  添加 :" << sock << " 到epoller中,成功" << std::endl;
            return true;
        }
        bool DelEvent(int sock) {
            if (epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr) < 0) {
                std::cerr << "epoll_ctl error  fd:" << sock << std::endl;
                return false;
            }

            event_item.erase(sock);
            return true;
        }

        //
        void EnableReadWrite(int sock,bool read,bool write) {
            struct epoll_event evt;
            evt.data.fd = sock;
            evt.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
            if (epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &evt)) {
                std::cerr << "epoll_ctl_mod error , fd: " << sock << std::endl;
            }
        }

        ~Reactor() {
            if (epfd >= 0) close(epfd);
        }
    };
}

appInterface.hpp
#pragma once
#include "reactor.hpp"
#include "util.hpp"
#include
#include 


namespace ns_appinterface {

	int recver(ns_epoll::EventItem* item);
	int sender(ns_epoll::EventItem * item);
	int errorer(ns_epoll::EventItem * item);

	int accepter(ns_epoll::EventItem* item) {
		std::cout << "get a new link [fd:" << item->sock <<"]" << std::endl;
		while (true) {
			struct sockaddr_in peer;
			socklen_t len = sizeof(peer);
			int sock = accept(item->sock, (struct sockaddr*)&peer, &len);

			if (sock < 0) {
				if (errno == EAGAIN || errno == EWOULDBLOCK) {
					//并没有读取出错,只是底层没有连接了
					return 0;
				}
				if (errno == EINTR) {
					//读取过程被信号打断了
					continue;
				}
				else {
					//出错了
					return -1; 
				}
			}
			//成功读取完毕
			else {

				ns_until::SetNoBlock(sock);


				ns_epoll::EventItem tmp;
				tmp.sock = sock;
				tmp.R = item->R;
				tmp.ManagerCallBack(recver, sender, errorer);

				ns_epoll::Reactor* reactor = item->R;
			
 
				//epoll使用时,经常会设置读事件就绪,写事件就绪按需打开 
				reactor->AddEvent(sock, EPOLLIN | EPOLLET, tmp);
			}
		}
		return 0;
	}


	//return 0 success
	//return -1 error
	int recver_helper(int sock, std::string* out) {
		while (true) {
			char buffer[128];
			ssize_t size = recv(sock, buffer, sizeof(buffer) - 1, 0);
			if (size < 0) {
				if (errno == EAGAIN || errno == EWOULDBLOCK) {
					//循环读取,读取完毕了
					return  0;
				}
				else if (errno == EINTR) {
					//被信号中断,继续读
					continue;
				}
				else {
					//读取出错 ,//TODO
					return -1;
				}
			}
			else {
				buffer[size] = 0;
				*out += buffer; //将每次读取到的数据追加到inbuffer中
			}
		}
	}

	int recver(ns_epoll::EventItem* item) {
		std::cout << "recv event ready: " << item->sock << std::endl;
		//负责数据读取
		//1.需要整体读,非阻塞
		if (recver_helper(item->sock, &item->inbuffer) < 0) {
			//item->error_handler;
			return -1;
		}
		std::cout << "client#" << item->inbuffer << std::endl;
		
		//2.根据发来的数据流,进行包和包之间的分离,防止粘包
		std::vector messages;
		ns_until::StringUtil::Split(item->inbuffer, &messages, "X");

		//3.针对一个一个的报文,协议反序列化decode
		//1+1X2*2X5%5X
		struct data {
			int x;
			int y;
		};

		//已经拿到了所有的数据报文
		for (auto s : messages) {
			struct data d;
			ns_until::StringUtil::Deserialize(s, &d.x, &d.y);
			std::cout << d.x << " : " << d.y << std::endl;

			//可以接入线程池
			//Task t(d)
			//threadpool->push

			//4.业务处理
			int z = d.x + d.y;

			//5.形成响应报文,序列化转化成为一个字符串
			std::string response;
			response += std::to_string(d.x);
			response += "+";
			response += std::to_string(d.y);
			response += "=";
			response += std::to_string(z);

			//响应报文追加进发送缓冲区
			item->outbuffer += response;
			//5.1 设置响应报文之间的分隔符
			item->outbuffer += "X"; //encode
		}
		//6.写回
		if(!item->outbuffer.empty())
			item->R->EnableReadWrite(item->sock, true, true);


		return 0;
	}

	//return 0 : 写完inbuffer
	//return 1 :缓冲区打满,下次再写
	//return -1 : error
	int sender_helper(int sock, std::string& in) {

		size_t total = 0;

		while (true) {
			ssize_t s = send(sock, in.c_str() + total, in.size() - total, 0);

			if (s > 0) {
				total += s;
				if (total >= in.size()) {
					//已经写完
					return 0;
				}
			}
			else if(s < 0) {
				if (errno == EAGAIN || errno == EWOULDBLOCK) {
					//无论是否发送完inbuffer,都需要将已经发送的数据,全部移出缓冲区
					in.erase(total);

					return  1;//已经将缓冲区写满了,不能再写入了,但是不知道有没有写完
				}
				else if (errno == EINTR) {
					//被信号中断,继续写
					continue;
				}
				else {
					//写失败 ,//TODO
					return -1;
				}
			}
			else {
				//TODO
			}
		}
	}

	int sender(ns_epoll::EventItem* item) {
		int hret = sender_helper(item->sock, item->outbuffer);
		if (hret == 0) {
			//已经发完
			//关闭写
			item->R->EnableReadWrite(item->sock, true, false);
		}
		else if(hret == 1) {
			item->R->EnableReadWrite(item->sock, true, true);
		}
		else {
			//item->error_handler(item);
		}
		return 0;
	}
	int errorer(ns_epoll::EventItem* item) {
		
		return 0;
	}

}

util.hpp
#pragma once
#include 
#include 
#include 
//#include 
namespace ns_until {

	//设置文件描述符为非阻塞状态
	void SetNoBlock(int sock) {
		int f1 = fcntl(sock, F_GETFL);
		fcntl(sock, F_SETFL, f1 | O_NONBLOCK);
	}
	class StringUtil {
	public:
		static void Split(std::string& in, std::vector* out,std::string sep) {
			while (true) {
				size_t pos = in.find(sep);
				if (pos == std::string::npos) {
					break;
				}

				std::string s = in.substr(0, pos);
				out->push_back(s);
				in.erase(0,pos+sep.size());

			}
		}
		static void Deserialize(std::string& in, int* x, int* y) {
			ssize_t pos = in.find('+');
			std::string left = in.substr(0, pos);
			std::string right = in.substr(pos + 1);

			*x = atoi(left.c_str());
			*y = atoi(right.c_str());


		}
	};
 }

简易demo版,还有差错处理模块和线程池未接入,功能简易,还有很多bug,主要是练习epoll ET模式和Reactor基本思路。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/882647.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号