文章目录
- muduo网络库学习—多线程服务器构造(6)
- 前言
- 一、Acceptor类
- 二、应用层缓冲区Buffer
- 2.1 为什么在非阻塞网络编程中,应用程buffer是必须的?
- 2.2 Buffer的实现(代码从略):
- 2.3 epoll使用LT模式的原因:
- 2.4 muduo Buffer的不足
- 三、线程池EventLoopThreadPool
- 四、TcpServer接收新连接,断开连接
- 五 TcpConnection
- 总结
前言
本节主要介绍muduo多线程服务器是如何构造的,尚未加入应用层的缓冲区。muduo多线程服务器主要有Acceptor(监听套接字),TcpConnection(连接套接字的封装类),Tcpserver(服务器主体),EventLoopThreadPool(线程池),Buffer(应用层缓冲区)。难点在于生命期的管理以及如何分配IO时间到多线程且保持线程安全。
一、Acceptor类
它是一个内部类,供TcpServer使用,生命期由后者控制。Acceptor的数据成员包括Socket、Channel,Acceptor的socket是listening socket(即server socket)。Channel用于观察此socket的readable事件,并回调Accptor::handleRead(),后者调用accept(2)来接受新连接,并回调用户callback。它是一个内部类,供TcpServer使用,生命期由后者控制。
Acceptor接口如下:
#ifndef ACCEPTOR_H #define ACCEPTOR_H #include "Channel.h" #include "Eventloop.h" #include "Socket.h" #include "InetAddress.h" #include "../base/noncopyable.h" #includenamespace muduo{ namespace net{ class Acceptor{ public: typedef std::function NewConnectionCallback; Acceptor(Eventloop*loop,const InetAddress&addr,bool portReuse); ~Acceptor(); void setNewConnectionCallback(const NewConnectionCallback&cb){ newConnectionCallback_=cb; } void listen(); bool listening(){ return listening_; } private: void handleread(); Eventloop*loop_; Socket acceptSocket_; Channel acceptChannel_; NewConnectionCallback newConnectionCallback_; bool listening_; int idleFd_; }; } } #endif
Acceptor的构造函数和listen成员函数执行创建TCP服务端的传统步骤即调用Socket,bind,listen等Sockets API,其中任何一个错误都会导致程序的中止。同时考虑到文件描述符用尽的情况预先保留了一个文件描述符(在此析构函数中关闭)。
Acceptor::Acceptor(Eventloop*loop,const InetAddress&addr,bool portReuse):
loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie()),
acceptChannel_(loop_,acceptSocket_.fd()),
listening_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_>=0);
acceptSocket_.bindAddress(addr);
acceptSocket_.setRuseAddr(portReuse);
acceptChannel_.setReadCallBack(std::bind(&Acceptor::handleread,this));
}
void Acceptor::listen(){
loop_->assertInLoopThread();
listening_=true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}
listen的最后一步让acceptChannel_在socket可读的时候调用成员函数handleread()函数,后者会接收accept,并回调newConnectionCallback_。
void Acceptor::handleread(){
loop_->assertInLoopThread();
InetAddress peerAddr;
int connfd=acceptSocket_.accept(peerAddr);
if(connfd>=0){
if(newConnectionCallback_){
newConnectionCallback_(connfd,peerAddr);
}
else{
::close(connfd);
}
}
else{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
二、应用层缓冲区Buffer
2.1 为什么在非阻塞网络编程中,应用程buffer是必须的?
非阻塞IO的核心是避免阻塞在read()或者write()或者其他系统调用上,这样可以最大程度上使用线程,让一个线程服务于多个socket。IO线程只能阻塞在IO复用函数上,如select/Poll/Epoll。这样一来,应用层缓冲区是必要的,每个Tcp Socket都要有inputBuffer,以及outputBuffer。
outputBuffer: 一个常见的场景:程序想通过Tcp给客户端发送100KB的数据,但是在write()调用中,操作系统只接受了80KB,就是还有20KB的数据没有发送,但是最好不要原地等待,因为你不知道需要等多久,这个主要取决于对方什么时候接收数据。所以程序应该尽快的交出控制权,返回EventLoop,在这种情况下剩余的20KB数据需要存放到outputBuffer中。
引入了outputBuffer我们需要注意几个事情:
(1)当outputBuffer中还有数据的时候,我们再次发送数据,此时的数据必须存放于outputBuffer中,直至socket变得可写了在一并写入。
(2)当outputBuffer中还有数据的时候,服务端想关闭,此时必须等到outputBuffer中数据发送完毕才能关闭服务端,因为对用用户来说,用户进行send之后,默认数据已经发送完毕。
(3)针对于大流量事件设置了高水位回调以及低水位回调,低水位回调必须当上一次发送的数据全部发送给内核缓冲区才能触发而低水位回调函数。如果不设置低水位回调函数,outputBuffer有可能会被无线扩大。而高水位回调函数是当outputBuffer的writeidx达到一定程度时候触发的回调函数。
在TcpConnenction中对应的代码如下:
void TcpConnection::sendInloop(const void*str,size_t len){
loop_->assertInLoopThread();
bool error=false;
//暂时不需要用到应用层缓冲区
size_t havesend=0;
size_t remain=len;
if (state_ == KDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
if(!channel_->isWriting()&&outputBuffer_.readableBytes()==0){
havesend=::write(channel_->fd(),str,len);
if(havesend<0){
if(errno!=EAGAIN){
havesend=0;
LOG_SYSERR << "TcpConnection::sendInLoop";
if(errno==EPIPE){
error=true;
}
}
}
else if(havesend>=0){
remain=len-havesend;
if(remain==0&&writeCompleteCallback_){
loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));
}
}
}
assert(remain<=len);
if(remain&&!error){
int oldLen=outputBuffer_.readableBytes();
if(remain+oldLen>=highWaterMark_&&oldLen
loop_->queueInLoop(std::bind(highWaterMarkCallback_,shared_from_this(),remain+oldLen));
}
outputBuffer_.append(str+havesend,remain);
if(!channel_->isWriting()){
channel_->enableWriting();// 关注POLLOUT事件
}
}
}
void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())
{
// we are not writing
socket_->shutdownWrite();
}
}
// 内核发送缓冲区有空间了,回调该函数
void TcpConnection::handleWrite(){
loop_->assertInLoopThread();
if(channel_->isWriting()){
size_t havesend=0;
size_t remain=0;
bool error=false;
havesend =::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes());
if(havesend<0){
if(errno!=EAGAIN){
havesend=0;
LOG_SYSERR << "TcpConnection::handleWrite";
if(errno==EPIPE){
error=true;
}
}
}
else if(havesend>=0){
outputBuffer_.retrieve(havesend);
remain=outputBuffer_.readableBytes();
if(remain==0&&writeCompleteCallback_){
channel_->disableWriting();
loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));
}
if (remain==0&&state_ == KDisconnecting) // 发送缓冲区已清空并且连接状态是kDisconnecting, 要关闭连接
{
shutdownInLoop(); // 关闭连接
}
}
if(!error&&remain>0){
LOG_TRACE << "I am going to write more data";
}
}
else{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
inputBuffer: Tcp是一个无边界的字节流协议,接收方必须要处理“收到的数据不构成一条完整的消息”和:一次收到两条消息数据的情况。在muduo处理socket可读事件的时候必须一次性把socket里的数据读完,也就是从内核缓冲区中把数据全部拷贝到用户缓冲区中,否则会持续触发EPLOLLN事件,造成busy-loop。那么muduo应对数据不完整的情况的方法是将数据先暂时存到inputBuffer中,等构成一条完整的消息的时候在通知程序进行业务逻辑。
代码如下:
ssize_t Buffer::readFd(int fd, int* savedErrno){
char temp[64*1024];
struct iovec iov[2];
iov[0].iov_base= beginWrite();
iov[0].iov_len=writeableBytes();
iov[1].iov_base=temp;
iov[1].iov_len=64*1024;
ssize_t n=::readv(fd,iov,2);
if(n<0){
savedErrno=&errno;
}
else if(n<=writeableBytes()){
writerIndex_+=n;
}
else{
writerIndex_=buffer_.size();
append(beginWrite(),n-writeableBytes());
}
return n;
}
void TcpConnection::handleRead(Timestamp time){
LOG_INFO<<"handle read";
loop_->assertInLoopThread();
int savedErrno=0;
int n=inputBuffer_.readFd(channel_->fd(),&savedErrno);
if(n>0){
messageCallback_(shared_from_this(),&inputBuffer_,time);
}
else if(n==0){
handleClose();
}
else{
savedErrno=errno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
2.2 Buffer的实现(代码从略):
2.3 epoll使用LT模式的原因:
1.与poll兼容
2.LT模式不会发生漏掉事件的BUG,但POLLOUT事件不能一开始就关注,否则会出现busy loop,而应该在write无法完全写入内核缓冲区的时候才关注,将未写入内核缓冲区的数据添加到应用层output buffer,直到应用层output buffer写完,停止关注POLLOUT事件。
3.读写的时候不必等候EAGAIN,可以节省系统调用次数,降低延迟。(注:如果用ET模式,读的时候读到EAGAIN,写的时候直到output buffer写完或者EAGAIN)
动态扩容的时候存在内存拷贝问题,需要连续的内存空间,如果采用链表进行管理的话代价是代码变得晦涩难懂。
三、线程池EventLoopThreadPool用one loop per thread的思想实现多线程TcpServer的关键步骤是在新建TcpConnection时候从Event loop pool中选出一个loop给TcpConnection用。也就是说多线程TcpServer自己的EventLoop只用来接收新连接,而新连接会用其他的EventLoop来执行IO(单线程TcpServer的EventLoop与TcpConnection是共享的)。
代码接口如下:
#ifndef EVENTLOOPTHREADPOOL_H #define EVENTLOOPTHREADPOOL_H #include "Eventloop.h" #include "EventLoopThread.h" #include#include #include namespace muduo{ namespace net{ class EventLoopThreadPool{ public: typedef std::function ThreadInitCallback_; EventLoopThreadPool(Eventloop*); ~EventLoopThreadPool(); void setThreadNum(int num){ numThreads_=num; } void start(const ThreadInitCallback_&cb=ThreadInitCallback_()); Eventloop*getNextLoop(); private: Eventloop* baseloop_;//与Acceptor所属的eventloop相同 bool started_;//是否开启线程池 int numThreads_;//线程数 int next_;//新连接到来Eventloop对应的下标 //ptr_vector析构的时候会析构自己开辟出来的存放指针的空间,同时析构指针本身指向的空间而一般容器不会析构指针本身指向的空间 boost::ptr_vector threads_; std::vector loops_; }; } } #endif
需要注意的是在多线程服务器中为了保证线程安全,跨线程调用函数的时候必须使用跨线程接口函数runInLoop,关键几步如下:
//非线程安全,只能在本线程调用 loop_n->runInLoop(std::bind(&TcpConnection::connectEstablish,con closeCallback_(guardThis); // 调用TcpServer::removeConnection loop_->runInLoop(std::bind(&TcpServer::removeConnectionInloop,th loop_io->queueInLoop(std::bind(&TcpConnection::connectDestory,conn));
值得一提的是虽然是多线程操作但是,每个线程中拥有的数据如EventLoop中timerQueue,每个线程中持有的TcpConnection,每个connection中缓冲区都是只能在其所属线程执行的不能跨线程调用,跨线程调用也没有意义。在muduo中可以跨线程调用的成员函数都已经明确标注出来了。这样做的好处是无需加锁,服务器的性能更高。
四、TcpServer接收新连接,断开连接TcpServer主要负责连接的建立以及连接的断开。TcpServer还包含了一个TcpConnection列表,TcpConnection与Acceptor类似,有两个重要的数据成员,Socket与Channel。TcpServer是用户直接使用的,生命期可以由用户直接控制,用户只需要设置好callback,在调用start成员函数即可。
TcpServer内部使用Acceptor来获取新得到的连接fd,它保存用户提供的ConectionCallback_,MessageCallback_,在新建TcpConnection的时候会原封不动的传给后者,TcpServer持有目前存活的TcpConnection的shared_ptr(定义为TcpConnectionPtr),因为TcpConnection对象的生命期是模糊的,用户也可以持有TcpConnectionPtr。
TcpServer的接口如下:
#ifndef TCPSERVER_H #define TCPSERVER_H #include "Eventloop.h" #include "Acceptor.h" #include "Callbacks.h" #include "TcpConnection.h" #include
建立连接时序图:
其中Channel::handleEvent()的触发条件时listening socket可读,表明有新的连接到来,TcpServer会为新的连接创建对应的TcpConnection对象。
每个TcpConnection对象都有一个名字,这个名字是有所属的TcpServer创建TcpConnection对象时候生成的,名字是ConnectionMap的key。在新连接到达的时候Acceptor会回调TcpServer中的newConnection,后者会从线程池中按照轮询的方式选出一个线程Loop创建TcpConnection对象conn并将他加入到ConnectionMap中,设置好callback_,然后在Loop对应的线程中使用conn->connectEstablished()其中会调用用户提供的ConnectionCallback。
代码如下:
//非线程安全,只能在本线程调用
void TcpServer::newConnection(int sockfd,const InetAddress&peerAddr){
loop_->assertInLoopThread();
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextId_);
++nextId_;
std::string connName = name_ + buf;
// name_=connName;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
Eventloop*loop_n=eventLoopThreadPool_->getNextLoop();
InetAddress localAddr=InetAddress((sockets::getLocalAddr(sockfd)));
TcpConnectionPtr conn(new TcpConnection(loop_n,connName,sockfd,localAddr,peerAddr));
// LOG_TRACE << "[1] usecount=" << conn.use_count();
connections_[connName]=conn;
// LOG_TRACE << "[2] usecount=" << conn.use_count();
conn->setMessageCallback(messageCallback_);
conn->setConnectionCallback(connectionCallback_);
conn->setCloseCallback(std::bind(&TcpServer::removeConnection,this,std::placeholders::_1));
conn->setWriteCompleteCallback(writeCompleteCallback_);
// conn->connectEstablish();//将TcpConnection的channel加入poll中监听,同时执行connectcallback
loop_n->runInLoop(std::bind(&TcpConnection::connectEstablish,conn));
// LOG_TRACE << "[5] usecount=" << conn.use_count();
}
void TcpConnection::connectEstablish(){
loop_->assertInLoopThread();
assert(state_==KConnecting);
setState(KConnected);
// LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
channel_->tie(shared_from_this());
channel_->enableReading();
connectionback_(shared_from_this());
// LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}
断开连接时序图:
muduo只有一种关闭连接的方式:被动关闭。即对方先关闭连接,本地read返回0,触发关闭逻辑调用handleclose具体流程图如下:
void TcpConnection::handleClose(){
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_;
assert(state_ == KConnected || state_ == KDisconnecting);
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(KDisconnected);
channel_->disableAll();
TcpConnectionPtr guardThis(shared_from_this());
connectionback_(guardThis); // 这一行,可以不调用
// LOG_TRACE << "[7] usecount=" << guardThis.use_count();
// must be the last line
closeCallback_(guardThis); // 调用TcpServer::removeConnection
// LOG_TRACE << "[11] usecount=" << guardThis.use_count();
}
void TcpServer::removeConnection(const TcpConnectionPtr& conn){
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInloop,this,conn));
}
void TcpServer::removeConnectionInloop(const TcpConnectionPtr& conn){
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
// LOG_TRACE << "[8] usecount=" << conn.use_count();
//因为channel中还有一个由weak_ptr升级的shared_ptr所以删除这个conn的引用计数也不会变成0
size_t n=connections_.erase(conn->name());
assert(n==1);
// LOG_TRACE << "[9] usecount=" << conn.use_count();
Eventloop*loop_io=conn->getLoop();
//在channel->handlevent后shared_ptr变成weak_ptr引用计数减一,故在此之前要添加一个引用计数
loop_io->queueInLoop(std::bind(&TcpConnection::connectDestory,conn));
// LOG_TRACE << "[10] usecount=" << conn.use_count();
}
void TcpConnection::connectDestory(){
loop_->assertInLoopThread();
if(state_==KConnected){
setState(KDisconnected);
connectionback_(shared_from_this());
}
channel_->remove();
}
TcpServer断开连接是由TcpConnection发起的,具体步骤见多线程池以及这块章节。ioLoop以及loop_的切换都是在连接的断开以及建立的阶段,不影响正常业务。
五 TcpConnectionTcpConnection是muduo中唯一一个用share_ptr来管理的对象,也是唯一一个继承于_enable_share_from的class,这源于其模糊的生命期。TcpConnection是一次Tcp连接,它是不可再生的,一旦连接断开这个对象就没有什么用了。
TcpConnection的生命期管理关键在于在其建立连接的时候将TcpConnection赋值给了对应Channel的weakPtr,这样可以在事件处理之前将weakptr升级成share_ptr从而在TcpServer中map删除TcpConnectionPtr时候该对象不会销毁,因为TcpConnection对应的channel也一直保留到handleEvent之后。
void TcpConnection::connectDestory(){
loop_->assertInLoopThread();
if(state_==KConnected){
setState(KDisconnected);
connectionback_(shared_from_this());
}
channel_->remove();
}
void Channel::handleEvent(Timestamp time){
std::shared_ptr guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
// LOG_TRACE << "[6] usecount=" << guard.use_count();
handleEeventWithGuard(time);
// LOG_TRACE << "[12] usecount=" << guard.use_count();
}
}
else
{
handleEeventWithGuard(time);
}
}
总结
TcpServer,TcpConnection,EventLoopThread这几个类紧密联系,需要结合一起来进行阅读源码,单纯看一个难以理解!



