网络相关的文件简述
tcpconnect代码TcpServer.* : tcp server,用于处理客户端连接,管理客户端连接
Acceptor.* : 接受客户端连接
EventThreadLoop.* : 线程池,每个线程中有一个eventloop做事件循环
InetAddress.* : 包装struct sockaddr_in , 方便对struct sockaddr_in的使用
socketsOps.* : 对网络相关系统api封装,简化使用
socket.* : 封装socketfd,方便对socket的使用
TcpConnection.* : 客户端连接对象
TcpServer::TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const string &nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback), //将connect的fd添加到poller后调用,默认回调defaultConnectionCallback打印信息
messageCallback_(defaultMessageCallback), //将tcpconnect收到消息调用,默认defaultMessageCallback丢弃所有收到的消息
nextConnId_(1)
{
// newConnection ,acceptor 接收到连接调用,创建tcpconnect对象
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_); //启动事件循环
assert(!acceptor_->listening());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_))); //将acceptor中fd添加到poller中,等待客户端连接
}
}
// 作为acceptor接收到连接后回调
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
loop_->assertInLoopThread();
EventLoop *ioLoop = threadPool_->getNextLoop();
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd)); //获得socket服务端地址
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_); // connectEstablished调用
conn->setMessageCallback(messageCallback_); //读取消息直接丢弃(默认read cb)
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); //添加任务到event loop 中
}
accepor
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listening_(false),
idleFd_(::open("/dev/null", O_RDonLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr); //至此完成了socket的创建,以及初始化
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this)); //设置channel中的回调,此刻完成了channel的初始化,但还没添加到poller中
}
Acceptor::~Acceptor()
{
acceptChannel_.disableAll();
acceptChannel_.remove();
::close(idleFd_);
}
// accepor开始工作
void Acceptor::listen()
{
loop_->assertInLoopThread();
listening_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading(); //添加socket_fd到poller中
}
//接受到新连接
// socketfd上有数据,说明新链接到达
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
// FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr); // accep从该socket上获取新的fd用于和该连接的数据交互
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
// 当连接过多,导致失败时,关闭下一个连接
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE) //文件描述符过多,导致无法创建connfd
{
::close(idleFd_); //关闭idlefd 留出fd 给accept使用
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_); //然后关闭新的连接
idleFd_ = ::open("/dev/null", O_RDonLY | O_CLOEXEC); //继续打开fd
}
}
}
这里讲解下idleFd_ 作用,我们知道对socket_fd accept可以获得一个新的fd用于和远端通信,如果远端连接过多,可能会消耗完所有的文件描述符fd。accept处理连接时,若出现 EMFILE 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃。这时候关闭idleFd_,留出fd给accept使用,然后关闭fd,断开和远端的连接,最后再打开idleFd_ ,以此关闭超过最大数量的远端连接,保证网络不崩溃。



