栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

muduo库网络部分

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

muduo库网络部分

网络相关的文件简述

TcpServer.* : tcp server,用于处理客户端连接,管理客户端连接
Acceptor.* : 接受客户端连接
EventThreadLoop.* : 线程池,每个线程中有一个eventloop做事件循环
InetAddress.* : 包装struct sockaddr_in , 方便对struct sockaddr_in的使用
socketsOps.* : 对网络相关系统api封装,简化使用
socket.* : 封装socketfd,方便对socket的使用
TcpConnection.* : 客户端连接对象

tcpconnect代码
TcpServer::TcpServer(EventLoop *loop,
                     const InetAddress &listenAddr,
                     const string &nameArg,
                     Option option)
    : loop_(CHECK_NOTNULL(loop)),
      ipPort_(listenAddr.toIpPort()),
      name_(nameArg),
      acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
      threadPool_(new EventLoopThreadPool(loop, name_)),
      connectionCallback_(defaultConnectionCallback), //将connect的fd添加到poller后调用,默认回调defaultConnectionCallback打印信息
      messageCallback_(defaultMessageCallback),       //将tcpconnect收到消息调用,默认defaultMessageCallback丢弃所有收到的消息
      nextConnId_(1)
{
  // newConnection ,acceptor 接收到连接调用,创建tcpconnect对象
  acceptor_->setNewConnectionCallback(
      std::bind(&TcpServer::newConnection, this, _1, _2));
}

void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)
  {
    threadPool_->start(threadInitCallback_); //启动事件循环

    assert(!acceptor_->listening());
    loop_->runInLoop(
        std::bind(&Acceptor::listen, get_pointer(acceptor_))); //将acceptor中fd添加到poller中,等待客户端连接
  }
}

// 作为acceptor接收到连接后回调
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
  loop_->assertInLoopThread();
  EventLoop *ioLoop = threadPool_->getNextLoop();
  char buf[64];
  snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
  InetAddress localAddr(sockets::getLocalAddr(sockfd)); //获得socket服务端地址
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  connections_[connName] = conn;
  conn->setConnectionCallback(connectionCallback_); // connectEstablished调用
  conn->setMessageCallback(messageCallback_);       //读取消息直接丢弃(默认read cb)
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      std::bind(&TcpServer::removeConnection, this, _1));                 // FIXME: unsafe
  ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); //添加任务到event loop 中
}

accepor
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop),
      acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
      acceptChannel_(loop, acceptSocket_.fd()),
      listening_(false),
      idleFd_(::open("/dev/null", O_RDonLY | O_CLOEXEC))
{
  assert(idleFd_ >= 0);
  acceptSocket_.setReuseAddr(true);
  acceptSocket_.setReusePort(reuseport);
  acceptSocket_.bindAddress(listenAddr); //至此完成了socket的创建,以及初始化
  acceptChannel_.setReadCallback(
      std::bind(&Acceptor::handleRead, this)); //设置channel中的回调,此刻完成了channel的初始化,但还没添加到poller中
}

Acceptor::~Acceptor()
{
  acceptChannel_.disableAll();
  acceptChannel_.remove();
  ::close(idleFd_);
}

// accepor开始工作
void Acceptor::listen()
{
  loop_->assertInLoopThread();
  listening_ = true;
  acceptSocket_.listen();
  acceptChannel_.enableReading(); //添加socket_fd到poller中
}

//接受到新连接
// socketfd上有数据,说明新链接到达
void Acceptor::handleRead()
{
  loop_->assertInLoopThread();
  InetAddress peerAddr;
  // FIXME loop until no more
  int connfd = acceptSocket_.accept(&peerAddr); // accep从该socket上获取新的fd用于和该连接的数据交互
  if (connfd >= 0)
  {
    // string hostport = peerAddr.toIpPort();
    // LOG_TRACE << "Accepts of " << hostport;
    if (newConnectionCallback_)
    {
      newConnectionCallback_(connfd, peerAddr);
    }
    else
    {
      sockets::close(connfd);
    }
  }
  else
  {
    // 当连接过多,导致失败时,关闭下一个连接
    LOG_SYSERR << "in Acceptor::handleRead";
    // Read the section named "The special problem of
    // accept()ing when you can't" in libev's doc.
    // By Marc Lehmann, author of libev.
    if (errno == EMFILE) //文件描述符过多,导致无法创建connfd
    {
      ::close(idleFd_); //关闭idlefd 留出fd 给accept使用
      idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
      ::close(idleFd_);                                    //然后关闭新的连接
      idleFd_ = ::open("/dev/null", O_RDonLY | O_CLOEXEC); //继续打开fd
    }
  }
}

这里讲解下idleFd_ 作用,我们知道对socket_fd accept可以获得一个新的fd用于和远端通信,如果远端连接过多,可能会消耗完所有的文件描述符fd。accept处理连接时,若出现 EMFILE 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃。这时候关闭idleFd_,留出fd给accept使用,然后关闭fd,断开和远端的连接,最后再打开idleFd_ ,以此关闭超过最大数量的远端连接,保证网络不崩溃。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/727791.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号