- 简述
- 1.线程池
- 2.RPC
- 2.1RpcServer
- 2.2RpcClient
- 3.测试
1.本文主要由三个部分,第一个是线程池的实现,第二个是RPC的实现,第三个是一些测试的代码。
2.ROC网络的部分使用我另一篇文章的网络服务实现,所以这里的RPC实现完全没有一行关于网络的代码。
3.因为使用的是现成的网络轮子,很多地方的接口也不得不受限于其提供的接口。
这个线程池是我直接拿来别人的改改,来自link.
threadpool.h文件
class CThreadPool
{
private:
std::vector MdPool; // 线程池
std::queue> MdTasks; // 提交的任务队列
std::mutex MdQueueMutex; // 队列的锁
std::condition_variable MdQueueCondition; // 队列的条件变量
std::atomic MdIsStop; // 队列停止时使用
public:
CThreadPool();
~CThreadPool();
void MfStart(size_t threads = 5);
bool MfIsStop() { return MdIsStop; };
private:
void MfTheadFun();
public:
template
auto MfEnqueue(F&& f, Args&&... args) -> std::future::type>;
};
// 后置返回类型,提取出参数F的返回值类型
// 模板成员需要写在,h中
template
auto CThreadPool::MfEnqueue(F&& f, Args&&... args)
-> std::future::type>
{
using return_type = typename std::result_of::type;
auto task = std::make_shared>(
std::bind(std::forward(f), std::forward(args)...)
);
std::future res = task->get_future();
{
std::unique_lock lock(MdQueueMutex);
if (MdIsStop)
printf("MfEnqueue on MdIsStopped ThreadPool");
MdTasks.emplace([task]() { (*task)(); });
}
MdQueueCondition.notify_one();
return res;
}
threadpool.cpp文件
CThreadPool::CThreadPool() :MdIsStop(false) {}
CThreadPool::~CThreadPool()
{
MdIsStop = true;
MdQueueCondition.notify_all();
for (std::thread& worker : MdPool)
worker.join();
}
void CThreadPool::MfStart(size_t threads) // 线程不应该在构造函数中启动,因为这些线程使用了数据成员
{
for (size_t i = 0; i < threads; ++i)
MdPool.push_back(std::thread(&CThreadPool::MfTheadFun, this));
}
void CThreadPool::MfTheadFun()
{
while (1)
{
std::function task; // 要执行的任务
{
std::unique_lock lock(MdQueueMutex);
MdQueueCondition.wait(lock, [this] { return this->MdIsStop || !this->MdTasks.empty(); });
if (this->MdIsStop && this->MdTasks.empty())
return;
task = std::move(this->MdTasks.front());
this->MdTasks.pop();
}
task();
}
}
2.RPC
RPC的server和client是写在同一个文件里的,但是这里分开来说明。
这里是server和client公用的一些东西。
过程的函数签名应该像RemoteProc那样,比如:std::shared_ptr
RpcMsg的定义就像unp卷二里描述的那样,只是一个指针,这个地址上的数据有多长,如何解释,完全自定义。
RpcCS.h
// 所有远程调用的函数应当遵循此接口 // 返回值是一个智能指针,指向一个char[] // 参数是一个智能指针,指向一个char[] // 使用智能指针而不是直接使用char*,是因为很多地方都是跨线程传递的,方便在调用中的内存管理 // 不论是返回值还是参数,都在void的基础上自定义结构解析 typedef std::function < std::shared_ptr2.1RpcServer(std::shared_ptr ) > RemoteProc; enum RpcNetMsgCmd { RpcProc_NOON = 0 // 若CNetMsgHead::MdCmd为该值, // 其余自定义的过程号都应该大于0 }; struct CNetMsgHead // 这个结构是搬过来说明的,他的定义不放在这里 { int MdLen; // 该包总长度 int MdCmd; // 该包执行的操作 CNetMsgHead() { MdLen = sizeof(CNetMsgHead); MdCmd = -1; // 该值为-1时默认为心跳包 } }; struct RpcMsg :public CNetMsgHead { // CNetMsgHead::MdLen 该成员依旧代表整个包的长度 // CNetMsgHead::MdCmd 该成员不再代表某个操作,而是直接代表要调用的那个过程号CallNo void* MdData; // 数据指针 // 在server中,收到该结构MdData表示参数,发送该结构MdData表示返回值 // 在client中,收到该结构MdData表示返回值,发送该结构MdData表示参数 // 发送时不设置该成员,而是直接写数据到缓冲区 // 从缓冲区取出时使用这个成员 RpcMsg() { MdData = nullptr; MdCmd = 0; } };
RpcCS.h
MdCallList:服务器要执行的过程调用,都存放在这里,存储了:
①过程号
②对应的过程函数地址
③该过程返回值的长度
④该过程参数的长度
MdCallListMutex:因为主线程和复写的网络消息处理线程都会访问该列表,这里使用了读写锁。
MdProcRet:每当有一个过程调用被放进线程池执行,代表其结果的对象就会放进这个表,存储了:
①对应客户端连接的CSocketObj*对象
②代表执行过程的过程号
③可以异步取得执行结果的std::future
MdProcRetMutex:MdProcRet是会被异步访问的见,MfCallRetrun
MfStart:该函数先启动一个线程池,从客户端递送过来的远程调用会被送进该线程池执行
MfCallRetrun:这是一个线程,它会在MftSart中被丢到线程池中执行,轮询MdProcRet中是否有结果可用,并把结果写回客户端,相当于MdProcRet的消费者。
MfVNetMsgDisposeFun:是基类提供的网络消息处理函数,当收到消息,会调用这个虚函数来处理,另一个身份是MdProcRet的生产者。
class CRpcServer :private CServiceNoBlock
{
private:
CThreadPool MdThreadPool; // 执行过程时的线程池
std::map> MdCallList; // 注册的过程表,分别描述过程号、过程函数,参数的长度、返回值的长度
std::shared_mutex MdCallListMutex; // 该表的互斥元
std::map>>> MdProcRet; // 过程放入线程池执行时会返回一个std::future,以便后续异步取得该过程的返回值
// 分别描述客户端连接、执行的过程号、可以取得返回值的future对象
std::shared_mutex MdProcRetMutex; // 过程结果集的锁
public:
CRpcServer(int HeartBeatTime = 300, int ServiceMaxPeoples = 100, int DisposeThreadNums = 1);
virtual ~CRpcServer();
void MfStart(const char* ip, unsigned short port, int threadNums = 3); // 启动收发线程和线程池,threadNums代表线程池线程数量
int MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen); // 注册一个过程
int MfRemoveCall(int CallNo); // 移除一个过程
private:
void MfCallRetrun();
virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};
RpcCS.cpp
这里主要说明MfVNetMsgDisposeFun的思路:
①从msg参数中取得要调用的过程号,到MdCallList中寻找,如果找不到对应过程,就给客户端发回一个过程号为RpcProc_NOON(0)的包,表示该过程不存在,然后就没事了。
②把可以异步取得结果的future对象放入队列,生产者。
CRpcServer::CRpcServer(int HeartBeatTime, int ServiceMaxPeoples, int DisposeThreadNums) :
CServiceNoBlock(HeartBeatTime, ServiceMaxPeoples, DisposeThreadNums)
{
}
CRpcServer::~CRpcServer()
{
}
void CRpcServer::MfStart(const char* ip, unsigned short port, int threadNums)
{
MdThreadPool.MfStart(threadNums+1); // 线程池启动
MdThreadPool.MfEnqueue(&CRpcServer::MfCallRetrun, this);// 给对应客户端写回结果的线程丢尽线程池执行
return CServiceNoBlock::Mf_NoBlock_Start(ip, port); // 网络收发处理启动
}
int CRpcServer::MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen)
{
{
std::lock_guard write_lock(MdCallListMutex);
if (MdCallList.find(CallNo) != MdCallList.end())
{
printf("RempteProcReg CallNo <%d> already existed!n", CallNo);
LogFormatMsgAndSubmit(std::this_thread::get_id(), ERROR_FairySun, "RempteProcReg CallNo <%d> already existed!n", CallNo);
return -1;
}
MdCallList[CallNo] = std::make_tuple(Call, ArgLen, RetLen);
}
return 0;
}
int CRpcServer::MfRemoveCall(int CallNo)
{
{
std::lock_guard write_lock(MdCallListMutex);
auto it = MdCallList.find(CallNo);
if (it != MdCallList.end())
MdCallList.erase(it);
}
return 0;
}
void CRpcServer::MfCallRetrun()
{
while (!MdProcRet.empty() || !MdThreadPool.MfIsStop())
{
RpcMsg ret;
// for循环中 执行完成的远程调用 的CSocketObj*会被加入该队列,结束后统一从MdProcRet中移除
std::vector removelist;
// 遍历MdProcRet,如果有一个过程可以取得结果,就把结果发回对应的socket
for (auto it = MdProcRet.begin(); it != MdProcRet.end(); ++it)
{
if (std::get<1>(it->second).valid()) // 如果结果可用
{
int procNo = std::get<0>(it->second);
int procRetSize = std::get<2>(MdCallList[procNo]);
ret.MdLen = sizeof(CNetMsgHead) + procRetSize;
ret.MdCmd = procNo;
std::shared_ptr data = std::get<1>(it->second).get();
it->first->MfDataToBuffer((char*)&ret, sizeof(CNetMsgHead)); // 先写包头
it->first->MfDataToBuffer(data.get(), procRetSize); // 再写数据
removelist.push_back(it->first);
}
}
// 将执行完成的远程调用统一移除
{
std::unique_lock write_lock(MdProcRetMutex);
for (auto it = removelist.begin(); it != removelist.end(); ++it)
{
if (MdProcRet.find(*it) != MdProcRet.end())
MdProcRet.erase(*it);
}
}
std::this_thread::sleep_for(std::chrono::seconds(1)); // 暂停一秒防止执行过快
}
}
void CRpcServer::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{
// 注册列表中没有找到对应的过程号,发回一个为0的消息,表示找不到对应过程
int flag = false;
RpcMsg ret;
{
std::shared_lock read_lock(MdCallListMutex);
if (MdCallList.find(msg->MdCmd) != MdCallList.end())
flag = true;
}
if (flag == false)
{
ret.MdLen = sizeof(RpcMsg);
ret.MdCmd = RpcProc_NOON;
cli->MfDataToBuffer((char*)&ret, ret.MdLen);
}
else // 否则就是找到了对应的过程,保存客户端对象和对应的future,放进结果表
{
int arglen = std::get<1>(MdCallList[msg->MdCmd]);
std::shared_ptr buf(new char[arglen]);
strncpy(buf.get(), ((char*)msg) + sizeof(CNetMsgHead), arglen);
{
std::unique_lock write_lock(MdProcRetMutex);
MdProcRet[cli] = std::make_tuple
(
msg->MdCmd,
MdThreadPool.MfEnqueue(std::get<0>(MdCallList[msg->MdCmd]), buf)
);
}
}
}
2.2RpcClient
RpcCs.h
没有任何新的数据成员,只是简单封装了下CClientlinkManage的接口
MfVNetMsgDisposeFun:基类提供的网络消息处理函数,每收到一条消息就会调用一次,这里虽然复写了它,但是复写成了空函数,它什么都不做,因为客户端的远程过程调用应当是阻塞的。
MfRemote:使用该函数来发起一个远程过程调用,第二三四参数分别指明了调用过程号、过程需要的数据地址、数据的长度。几乎全部的逻辑都集中在该函数中:
①声明一个RpcMsg结构,填写包头结构,然后把包头和数据分别写到对应的套接字里。
②循环检查是否有服务端的数据发回。
③有数据发回,检查服务器返回的过程号是否正确,不正确和0都会导致该函数返回空指针来表示失败。
④正确就从套接字中取出数据,然后写到一个智能指针标识的空间中返回。
class CRpcClient :private CClientlinkManage
{
private:
public:
CRpcClient();
virtual ~CRpcClient();
void MfStart(); // 启动收发线程
int MfConnectRpcServer(std::string linkname, const char* ip, unsigned short port); // 连接Rpc服务
void MfCloseRpclink(std::string linkname); // 关闭和Rpc的连接
std::shared_ptr MfRemote(std::string linkname, int CallNo, void* data, int DataSize); // 远程过程调用,阻塞等待
private:
virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};
RpcCs.cpp
CRpcClient::CRpcClient():
CClientlinkManage()
{
}
CRpcClient::~CRpcClient()
{
}
void CRpcClient::MfStart()
{
CClientlinkManage::MfStart();
}
int CRpcClient::MfConnectRpcServer(std::string linkname, const char* ip, unsigned short port)
{
return CClientlinkManage::MfCreateAddlink(linkname, ip, port);
}
void CRpcClient::MfCloseRpclink(std::string linkname)
{
CClientlinkManage::MfCloselink(linkname);
}
std::shared_ptr CRpcClient::MfRemote(std::string linkname, int CallNo, void* data, int DataSize)
{
RpcMsg msg;
msg.MdLen = sizeof(CNetMsgHead) + DataSize;
msg.MdCmd = CallNo;
CClientlinkManage::MfSendData(linkname, (char*)&msg, sizeof(CNetMsgHead)); // 先写包头
CClientlinkManage::MfSendData(linkname, (char*)data, DataSize); // 再写数据
while (1)
{
if (!CClientlinkManage::MfHasMsg(linkname)) // 如果没有数据
{
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
else
{
const char * buff = CClientlinkManage::MfGetRecvBufferP(linkname);
if ( ((RpcMsg*)buff)->MdCmd == RpcProc_NOON ) // 收到了为0的callno
return nullptr;
else if ( ((RpcMsg*)buff)->MdCmd != CallNo ) // 收到的callno和发出去的callno不一样
return nullptr;
else
{
int retsize = ((RpcMsg*)buff)->MdLen - sizeof(CNetMsgHead); // 计算返回的数据长度
char* ret = new char[retsize]; // 申请空间
strncpy(ret, ((char*)buff) + sizeof(CNetMsgHead) , retsize); // 复制到新申请的空间
CClientlinkManage::MfPopFrontMsg(linkname); // 缓冲区的消息弹出
return std::shared_ptr(ret); // 转换成智能指针返回
}
}
}
}
void CRpcClient::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{
}
3.测试
client.cpp
int main()
{
FairySunOfNetbaseStart(); // https://blog.csdn.net/qq_43082206/article/details/110383165
int i = 5;
CRpcClient c;
c.MfStart();
c.MfConnectRpcServer("rpc", "118.31.75.171", 4567);
for(int i = 0; i < 10; ++i)
printf("remote ret:%dn", *(int*)c.MfRemote("rpc", 2, (void*)&i, sizeof(int)).get());
for (int i = 0; i < 3; ++i)
printf("remote ret:%dn", *(int*)c.MfRemote("rpc", 1, (void*)&i, sizeof(int)).get());
getchar();
FairySunOfNetbaseOver();
return 0;
}
server.cpp
std::shared_ptrtestf1(std::shared_ptr a) { printf("test11111n"); char* ret = new char[sizeof(int)]; int s = *((int*)a.get()) + 1; *(int*)ret = s; std::this_thread::sleep_for(std::chrono::seconds(5)); return std::shared_ptr (ret); } std::shared_ptr testf2(std::shared_ptr a) { printf("test22222n"); char* ret = new char[sizeof(int)]; *(int*)ret = *(int*)a.get() + 2; return std::shared_ptr (ret); } int main() { FairySunOfNetbaseStart(); // https://blog.csdn.net/qq_43082206/article/details/110383165 CRpcServer s; s.MfStart(0, 4567); s.MfRegCall(1, testf1, sizeof(int), sizeof(int)); s.MfRegCall(2, testf2, sizeof(int), sizeof(int)); s.MfRegCall(2, testf2, sizeof(int), sizeof(int)); //s.MfRemoveCall(1); //s.MfRemoveCall(2); getchar(); FairySunOfNetbaseOver(); return 0; }
2021年11月16日16:19:38



