栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

(二)Reactor模式

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

(二)Reactor模式

系列目录
第01篇 主线程与工作线程的分工
第02篇 Reactor模式
第03篇 一个服务器程序的架构介绍
第04篇 如何将socket设置为非阻塞模式
第05篇 如何编写高性能日志
第06篇 关于网络编程的一些实用技巧和细节
第07篇 开源一款即时通讯软件的源码
第08篇 高性能服务器架构设计总结1
第09篇 高性能服务器架构设计总结2
第10篇 高性能服务器架构设计总结3
第11篇 高性能服务器架构设计总结4


最近一直在看游双的《高性能linux服务器编程》一书,下载链http://download.csdn.net/detail/analogous_love/9673008

书上是这么介绍Reactor模式的:

按照这个思路,我写个简单的练习:


#include 
#include 
#include 
#include 
#include 
#include   //for htonl() and htons()
#include 
#include 
#include 
#include      //for signal()
#include 
#include 
#include 
#include 
#include 
#include 
#include  //for std::setw()/setfill()
#include 

#define WORKER_THREAD_NUM   5

#define min(a, b) ((a <= b) ? (a) : (b)) 

int g_epollfd = 0;
bool g_bStop = false;
int g_listenfd = 0;
pthread_t g_acceptthreadid = 0;
pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };
pthread_cond_t g_acceptcond;
pthread_mutex_t g_acceptmutex;

pthread_cond_t g_cond ;
pthread_mutex_t g_mutex ;

pthread_mutex_t g_clientmutex;

std::list g_listClients;

void prog_exit(int signo)
{
    ::signal(SIGINT, SIG_IGN);
    //::signal(SIGKILL, SIG_IGN);//该信号不能被阻塞、处理或者忽略
    ::signal(SIGTERM, SIG_IGN);

    std::cout << "program recv signal " << signo << " to exit." << std::endl;

    g_bStop = true;

    ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);

    //TODO: 是否需要先调用shutdown()一下?
    ::shutdown(g_listenfd, SHUT_RDWR);
    ::close(g_listenfd);
    ::close(g_epollfd);

    ::pthread_cond_destroy(&g_acceptcond);
    ::pthread_mutex_destroy(&g_acceptmutex);

    ::pthread_cond_destroy(&g_cond);
    ::pthread_mutex_destroy(&g_mutex);

    ::pthread_mutex_destroy(&g_clientmutex);
}

bool create_server_listener(const char* ip, short port)
{
    g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (g_listenfd == -1)
 return false;

    int on = 1;
    ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
    ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr)); 
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = inet_addr(ip);
    servaddr.sin_port = htons(port);
    if (::bind(g_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)
 return false;

    if (::listen(g_listenfd, 50) == -1)
 return false;

    g_epollfd = ::epoll_create(1);
    if (g_epollfd == -1)
 return false;

    struct epoll_event e;
    memset(&e, 0, sizeof(e));
    e.events = EPOLLIN | EPOLLRDHUP;
    e.data.fd = g_listenfd;
    if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)
 return false;

    return true;
}

void release_client(int clientfd)
{
    if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)
 std::cout << "release client socket failed as call epoll_ctl failed" << std::endl;

    ::close(clientfd);
}

void* accept_thread_func(void* arg)
{   
    while (!g_bStop)
    {
 ::pthread_mutex_lock(&g_acceptmutex);
 ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);
 //::pthread_mutex_lock(&g_acceptmutex);

 //std::cout << "run loop in accept_thread_func" << std::endl;

 struct sockaddr_in clientaddr;
 socklen_t addrlen;
 int newfd = ::accept(g_listenfd, (struct sockaddr *)&clientaddr, &addrlen);
 ::pthread_mutex_unlock(&g_acceptmutex);
 if (newfd == -1)
     continue;

 std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;

 //将新socket设置为non-blocking
 int oldflag = ::fcntl(newfd, F_GETFL, 0);
 int newflag = oldflag | O_NONBLOCK;
 if (::fcntl(newfd, F_SETFL, newflag) == -1)
 {
     std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;
     continue;
 }

 struct epoll_event e;
 memset(&e, 0, sizeof(e));
 e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
 e.data.fd = newfd;
 if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
 {
     std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
 }
    }

    return NULL;
}

void* worker_thread_func(void* arg)
{   
    while (!g_bStop)
    {
 int clientfd;
 ::pthread_mutex_lock(&g_clientmutex);
 while (g_listClients.empty())
     ::pthread_cond_wait(&g_cond, &g_clientmutex);
 clientfd = g_listClients.front();
 g_listClients.pop_front();  
 pthread_mutex_unlock(&g_clientmutex);

 //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
 std::cout << std::endl;

 std::string strclientmsg;
 char buff[256];
 bool bError = false;
 while (true)
 {
     memset(buff, 0, sizeof(buff));
     int nRecv = ::recv(clientfd, buff, 256, 0);
     if (nRecv == -1)
     {
  if (errno == EWOULDBLOCK)
      break;
  else
  {
      std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;
      release_client(clientfd);
      bError = true;
      break;
  }

     }
     //对端关闭了socket,这端也关闭。
     else if (nRecv == 0)
     {
  std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;
  release_client(clientfd);
  bError = true;
  break;
     }

     strclientmsg += buff;
 }

 //出错了,就不要再继续往下执行了
 if (bError)
     continue;

 std::cout << "client msg: " << strclientmsg;

 //将消息加上时间标签后发回
 time_t now = time(NULL);
 struct tm* nowstr = localtime(&now);
 std::ostringstream ostimestr;
 ostimestr << "[" << nowstr->tm_year + 1900 << "-" 
    << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-" 
    << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "
    << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":" 
    << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":" 
    << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";

 strclientmsg.insert(0, ostimestr.str());

 while (true)
 {
     int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);
     if (nSent == -1)
     {
  if (errno == EWOULDBLOCK)
  {
      ::sleep(10);
      continue;
  }
  else
  {
      std::cout << "send error, fd = " << clientfd << std::endl;
      release_client(clientfd);
      break;
  }

     }   

     std::cout << "send: " << strclientmsg;
     strclientmsg.erase(0, nSent);

     if (strclientmsg.empty())
  break;
 }
    }

    return NULL;
}

void daemon_run()
{
    int pid;
    signal(SIGCHLD, SIG_IGN);
    //1)在父进程中,fork返回新创建子进程的进程ID;
    //2)在子进程中,fork返回0;
    //3)如果出现错误,fork返回一个负值;
    pid = fork();
    if (pid < 0)
    {
 std:: cout << "fork error" << std::endl;
 exit(-1);
    }
    //父进程退出,子进程独立运行
    else if (pid > 0) {
 exit(0);
    }
    //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,
    //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。
    //执行setsid()之后,child将重新获得一个新的会话(session)id。
    //这时parent退出之后,将不会影响到child了。
    setsid();
    int fd;
    fd = open("/dev/null", O_RDWR, 0);
    if (fd != -1)
    {
 dup2(fd, STDIN_FILENO);
 dup2(fd, STDOUT_FILENO);
 dup2(fd, STDERR_FILENO);
    }
    if (fd > 2)
 close(fd);

}

int main(int argc, char* argv[])
{  
    short port = 0;
    int ch;
    bool bdaemon = false;
    while ((ch = getopt(argc, argv, "p:d")) != -1)
    {
 switch (ch)
 {
 case 'd':
     bdaemon = true;
     break;
 case 'p':
     port = atol(optarg);
     break;
 }
    }

    if (bdaemon)
 daemon_run();

    if (port == 0)
 port = 12345;

    if (!create_server_listener("0.0.0.0", port))
    {
 std::cout << "Unable to create listen server: ip=0.0.0.0, port=" << port << "." << std::endl;
 return -1;
    }

    //设置信号处理
    signal(SIGCHLD, SIG_DFL);
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT, prog_exit);
    //signal(SIGKILL, prog_exit);//该信号不能被阻塞、处理或者忽略

    signal(SIGTERM, prog_exit);

    ::pthread_cond_init(&g_acceptcond, NULL);
    ::pthread_mutex_init(&g_acceptmutex, NULL);

    ::pthread_cond_init(&g_cond, NULL);
    ::pthread_mutex_init(&g_mutex, NULL);

    ::pthread_mutex_init(&g_clientmutex, NULL);

    ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);
    //启动工作线程
    for (int i = 0; i < WORKER_THREAD_NUM; ++i)
    {
 ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);
    }

    while (!g_bStop)
    {
 struct epoll_event ev[1024];
 int n = ::epoll_wait(g_epollfd, ev, 1024, 10);
 if (n == 0)
     continue;
 else if (n < 0)
 {
     std::cout << "epoll_wait error" << std::endl;
     continue;
 }

 int m = min(n, 1024);
 for (int i = 0; i < m; ++i)
 {
     //通知接收连接线程接收新连接
     if (ev[i].data.fd == g_listenfd)
  pthread_cond_signal(&g_acceptcond);
     //通知普通工作线程接收数据
     else
     { 
  pthread_mutex_lock(&g_clientmutex);
  g_listClients.push_back(ev[i].data.fd);
  pthread_mutex_unlock(&g_clientmutex);
  pthread_cond_signal(&g_cond);
  //std::cout << "signal" << std::endl;
     }

 }

    }

    return 0;
}

程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。

使用到的知识点有:

1.条件变量

2.epoll的边缘触发模式

程序的大致框架是:

1.主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。

2.主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。

3.可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法。

程序难点和需要注意的地方是:

1.条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使用了:

while (g_listClients.empty())
     ::pthread_cond_wait(&g_cond, &g_clientmutex);

在accept_thread_func()函数里面我没有使用循环,这样会有问题吗?

2.使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:

mutex_lock(...);

while (condition is true)
    ::pthread_cond_wait(...);

//这里可以有其他代码...
mutex_unlock(...);

//这里可以有其他代码...

因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。

3.为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。
其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:

int evutil_make_listen_socket_reuseable(evutil_socket_t sock)
{
#ifndef WIN32
    int one = 1;
    
    return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,
 (ev_socklen_t)sizeof(one));
#else
    return 0;
#endif
}

注意注释部分,在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。

4.epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。
相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。

5.代码中有这样一行:

//gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
 std::cout << std::endl;

如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。
程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:
linux:
nc 120.55.94.78 12345

telnet 120.55.94.78 12345
然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:

另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:

CMakeLists.txt代码如下:

cmake_minimum_required(VERSION 2.8)

PROJECT(myreactorserver)

AUX_SOURCE_DIRECTORY(./ SRC_LIST)
SET(EXECUTABLE_OUTPUT_PATH ./)

ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64 -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR -DAC_HAS_CRITICAL -DTIXML_USE_STL -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS} -std=c++11)

INCLUDE_DIRECTORIES(
./
)
link_DIRECTORIES(
./
)

set(
main.cpp
myreator.cpp
)

ADD_EXECUTABLE(myreactorserver ${SRC_LIST})

TARGET_link_LIBRARIES(myreactorserver pthread)

myreactor.h文件内容:

#ifndef __MYREACTOR_H__
#define __MYREACTOR_H__

#include 
#include 
#include 
#include 
#include 

#define WORKER_THREAD_NUM   5

class CMyReactor
{
public:
    CMyReactor();
    ~CMyReactor();

    bool init(const char* ip, short nport);
    bool uninit();

    bool close_client(int clientfd);

    static void* main_loop(void* p);

private:
    //no copyable
    CMyReactor(const CMyReactor& rhs);
    CMyReactor& operator = (const CMyReactor& rhs);

    bool create_server_listener(const char* ip, short port);

    static void accept_thread_proc(CMyReactor* pReatcor);
    static void worker_thread_proc(CMyReactor* pReatcor);

private:
    //C11语法可以在这里初始化
    int     m_listenfd = 0;
    int     m_epollfd  = 0;
    bool    m_bStop    = false;

    std::shared_ptr m_acceptthread;
    std::shared_ptr m_workerthreads[WORKER_THREAD_NUM];

    std::condition_variable      m_acceptcond;
    std::mutex     m_acceptmutex;

    std::condition_variable      m_workercond ;
    std::mutex     m_workermutex;

    std::list m_listClients;
};

#endif //!__MYREACTOR_H__

myreactor.cpp文件内容:

#include "myreactor.h"
#include 
#include 
#include 
#include 
#include 
#include   //for htonl() and htons()
#include 
#include 
#include 
#include 
#include 
#include 
#include  //for std::setw()/setfill()
#include 

#define min(a, b) ((a <= b) ? (a) : (b))

CMyReactor::CMyReactor()
{
    //m_listenfd = 0;
    //m_epollfd = 0;
    //m_bStop = false;
}

CMyReactor::~CMyReactor()
{

}

bool CMyReactor::init(const char* ip, short nport)
{
    if (!create_server_listener(ip, nport))
    {
 std::cout << "Unable to bind: " << ip << ":" << nport << "." << std::endl;
 return false;
    }

    std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;

    //启动接收新连接的线程
    m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));

    //启动工作线程
    for (auto& t : m_workerthreads)
    {
 t.reset(new std::thread(CMyReactor::worker_thread_proc, this));
    }

    return true;
}

bool CMyReactor::uninit()
{
    m_bStop = true;
    m_acceptcond.notify_one();
    m_workercond.notify_all();

    m_acceptthread->join();
    for (auto& t : m_workerthreads)
    {
 t->join();
    }

    ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);

    //TODO: 是否需要先调用shutdown()一下?
    ::shutdown(m_listenfd, SHUT_RDWR);
    ::close(m_listenfd);
    ::close(m_epollfd);

    return true;
}

bool CMyReactor::close_client(int clientfd)
{
    if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)
    {
 std::cout << "close client socket failed as call epoll_ctl failed" << std::endl;
 //return false;
    }

    ::close(clientfd);

    return true;
}

void* CMyReactor::main_loop(void* p)
{
    std::cout << "main thread id = " << std::this_thread::get_id() << std::endl;

    CMyReactor* pReatcor = static_cast(p);

    while (!pReatcor->m_bStop)
    {
 struct epoll_event ev[1024];
 int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);
 if (n == 0)
     continue;
 else if (n < 0)
 {
     std::cout << "epoll_wait error" << std::endl;
     continue;
 }

 int m = min(n, 1024);
 for (int i = 0; i < m; ++i)
 {
     //通知接收连接线程接收新连接
     if (ev[i].data.fd == pReatcor->m_listenfd)
  pReatcor->m_acceptcond.notify_one();
     //通知普通工作线程接收数据
     else
     {
  {
      std::unique_lock guard(pReatcor->m_workermutex);
      pReatcor->m_listClients.push_back(ev[i].data.fd);
  }

  pReatcor->m_workercond.notify_one();
  //std::cout << "signal" << std::endl;
     }// end if

 }// end for-loop
    }// end while

    std::cout << "main loop exit ..." << std::endl;

    return NULL;
}

void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)
{
    std::cout << "accept thread, thread id = " << std::this_thread::get_id() << std::endl;

    while (true)
    {
 int newfd;
 struct sockaddr_in clientaddr;
 socklen_t addrlen;
 {
     std::unique_lock guard(pReatcor->m_acceptmutex);
     pReatcor->m_acceptcond.wait(guard);
     if (pReatcor->m_bStop)
  break;

     //std::cout << "run loop in accept_thread_proc" << std::endl;

     newfd = ::accept(pReatcor->m_listenfd, (struct sockaddr *)&clientaddr, &addrlen);
 }
 if (newfd == -1)
     continue;

 std::cout << "new client connected: " << ::inet_ntoa(clientaddr.sin_addr) << ":" << ::ntohs(clientaddr.sin_port) << std::endl;

 //将新socket设置为non-blocking
 int oldflag = ::fcntl(newfd, F_GETFL, 0);
 int newflag = oldflag | O_NONBLOCK;
 if (::fcntl(newfd, F_SETFL, newflag) == -1)
 {
     std::cout << "fcntl error, oldflag =" << oldflag << ", newflag = " << newflag << std::endl;
     continue;
 }

 struct epoll_event e;
 memset(&e, 0, sizeof(e));
 e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
 e.data.fd = newfd;
 if (::epoll_ctl(pReatcor->m_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)
 {
     std::cout << "epoll_ctl error, fd =" << newfd << std::endl;
 }
    }

    std::cout << "accept thread exit ..." << std::endl;
}

void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
{
    std::cout << "new worker thread, thread id = " << std::this_thread::get_id() << std::endl;

    while (true)
    {
 int clientfd;
 {
     std::unique_lock guard(pReatcor->m_workermutex);
     while (pReatcor->m_listClients.empty())
     {
  if (pReatcor->m_bStop)
  {
      std::cout << "worker thread exit ..." << std::endl;
      return;
  }

  pReatcor->m_workercond.wait(guard);
     }

     clientfd = pReatcor->m_listClients.front();
     pReatcor->m_listClients.pop_front();
 }

 //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来
 std::cout << std::endl;

 std::string strclientmsg;
 char buff[256];
 bool bError = false;
 while (true)
 {
     memset(buff, 0, sizeof(buff));
     int nRecv = ::recv(clientfd, buff, 256, 0);
     if (nRecv == -1)
     {
  if (errno == EWOULDBLOCK)
      break;
  else
  {
      std::cout << "recv error, client disconnected, fd = " << clientfd << std::endl;
      pReatcor->close_client(clientfd);
      bError = true;
      break;
  }

     }
     //对端关闭了socket,这端也关闭。
     else if (nRecv == 0)
     {
  std::cout << "peer closed, client disconnected, fd = " << clientfd << std::endl;
  pReatcor->close_client(clientfd);
  bError = true;
  break;
     }

     strclientmsg += buff;
 }

 //出错了,就不要再继续往下执行了
 if (bError)
     continue;

 std::cout << "client msg: " << strclientmsg;

 //将消息加上时间标签后发回
 time_t now = time(NULL);
 struct tm* nowstr = localtime(&now);
 std::ostringstream ostimestr;
 ostimestr << "[" << nowstr->tm_year + 1900 << "-"
     << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"
     << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "
     << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"
     << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"
     << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";

 strclientmsg.insert(0, ostimestr.str());

 while (true)
 {
     int nSent = ::send(clientfd, strclientmsg.c_str(), strclientmsg.length(), 0);
     if (nSent == -1)
     {
  if (errno == EWOULDBLOCK)
  {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
  }
  else
  {
      std::cout << "send error, fd = " << clientfd << std::endl;
      pReatcor->close_client(clientfd);
      break;
  }

     }

     std::cout << "send: " << strclientmsg;
     strclientmsg.erase(0, nSent);

     if (strclientmsg.empty())
  break;
 }
    }
}

bool CMyReactor::create_server_listener(const char* ip, short port)
{
    m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (m_listenfd == -1)
 return false;

    int on = 1;
    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on));

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = inet_addr(ip);
    servaddr.sin_port = htons(port);
    if (::bind(m_listenfd, (sockaddr *)&servaddr, sizeof(servaddr)) == -1)
 return false;

    if (::listen(m_listenfd, 50) == -1)
 return false;

    m_epollfd = ::epoll_create(1);
    if (m_epollfd == -1)
 return false;

    struct epoll_event e;
    memset(&e, 0, sizeof(e));
    e.events = EPOLLIN | EPOLLRDHUP;
    e.data.fd = m_listenfd;
    if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)
 return false;

    return true;
}

main.cpp文件内容:


#include 
#include      //for signal()
#include
#include      //for exit()
#include 
#include 
#include 
#include "myreactor.h"

CMyReactor g_reator;

void prog_exit(int signo)
{
    std::cout << "program recv signal " << signo << " to exit." << std::endl; 

    g_reator.uninit();
}

void daemon_run()
{
    int pid;
    signal(SIGCHLD, SIG_IGN);
    //1)在父进程中,fork返回新创建子进程的进程ID;
    //2)在子进程中,fork返回0;
    //3)如果出现错误,fork返回一个负值;
    pid = fork();
    if (pid < 0)
    {
 std:: cout << "fork error" << std::endl;
 exit(-1);
    }
    //父进程退出,子进程独立运行
    else if (pid > 0) {
 exit(0);
    }
    //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,
    //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。
    //执行setsid()之后,child将重新获得一个新的会话(session)id。
    //这时parent退出之后,将不会影响到child了。
    setsid();
    int fd;
    fd = open("/dev/null", O_RDWR, 0);
    if (fd != -1)
    {
 dup2(fd, STDIN_FILENO);
 dup2(fd, STDOUT_FILENO);
 dup2(fd, STDERR_FILENO);
    }
    if (fd > 2)
 close(fd);
}

int main(int argc, char* argv[])
{  
    //设置信号处理
    signal(SIGCHLD, SIG_DFL);
    signal(SIGPIPE, SIG_IGN);
    signal(SIGINT, prog_exit);
    signal(SIGKILL, prog_exit);
    signal(SIGTERM, prog_exit);

    short port = 0;
    int ch;
    bool bdaemon = false;
    while ((ch = getopt(argc, argv, "p:d")) != -1)
    {
 switch (ch)
 {
 case 'd':
     bdaemon = true;
     break;
 case 'p':
     port = atol(optarg);
     break;
 }
    }

    if (bdaemon)
 daemon_run();

    if (port == 0)
 port = 12345;

    if (!g_reator.init("0.0.0.0", 12345))
 return -1;

    g_reator.main_loop(&g_reator);

    return 0;
}

完整实例代码下载地址:

普通版本:https://pan.baidu.com/s/1o82Mkno

C++11版本:https://pan.baidu.com/s/1dEJdrih


您可以接着阅读下一篇:《 一个服务器程序的架构介绍》

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/232746.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号