栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

一个c++RPC实现

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

一个c++RPC实现

一个c++RPC实现
  • 简述
  • 1.线程池
  • 2.RPC
    • 2.1RpcServer
    • 2.2RpcClient
  • 3.测试

简述

  1.本文主要由三个部分,第一个是线程池的实现,第二个是RPC的实现,第三个是一些测试的代码。
  2.ROC网络的部分使用我另一篇文章的网络服务实现,所以这里的RPC实现完全没有一行关于网络的代码。
  3.因为使用的是现成的网络轮子,很多地方的接口也不得不受限于其提供的接口。

1.线程池

  这个线程池是我直接拿来别人的改改,来自link.
  threadpool.h文件

class CThreadPool 
{
private:
    std::vector            MdPool;             // 线程池
    std::queue>   MdTasks;            // 提交的任务队列
    std::mutex                          MdQueueMutex;       // 队列的锁
    std::condition_variable             MdQueueCondition;   // 队列的条件变量
    std::atomic                   MdIsStop;           // 队列停止时使用
public:
    CThreadPool();
    ~CThreadPool();
    void MfStart(size_t threads = 5);    
    bool MfIsStop() { return MdIsStop; };
private:
    void MfTheadFun();
public:
    template
    auto MfEnqueue(F&& f, Args&&... args) -> std::future::type>;
};


// 后置返回类型,提取出参数F的返回值类型
// 模板成员需要写在,h中
template
auto CThreadPool::MfEnqueue(F&& f, Args&&... args)
-> std::future::type>
{
    using return_type = typename std::result_of::type;

    auto task = std::make_shared>(
        std::bind(std::forward(f), std::forward(args)...)
        );

    std::future res = task->get_future();
    {
        std::unique_lock lock(MdQueueMutex);
        if (MdIsStop)
            printf("MfEnqueue on MdIsStopped ThreadPool");
        MdTasks.emplace([task]() { (*task)(); });
    }
    MdQueueCondition.notify_one();
    return res;
}

  threadpool.cpp文件

CThreadPool::CThreadPool() :MdIsStop(false) {}

CThreadPool::~CThreadPool()
{
    MdIsStop = true;
    MdQueueCondition.notify_all();
    for (std::thread& worker : MdPool)
        worker.join();
}

void CThreadPool::MfStart(size_t threads)       // 线程不应该在构造函数中启动,因为这些线程使用了数据成员
{
    for (size_t i = 0; i < threads; ++i)
        MdPool.push_back(std::thread(&CThreadPool::MfTheadFun, this));
}

void CThreadPool::MfTheadFun()
{
    while (1)
    {
        std::function task;     // 要执行的任务
        {
            std::unique_lock lock(MdQueueMutex);
            MdQueueCondition.wait(lock, [this] { return this->MdIsStop || !this->MdTasks.empty(); });
            if (this->MdIsStop && this->MdTasks.empty())
                return;
            task = std::move(this->MdTasks.front());
            this->MdTasks.pop();
        }
        task();
    }
}
2.RPC

  RPC的server和client是写在同一个文件里的,但是这里分开来说明。
  这里是server和client公用的一些东西。
  过程的函数签名应该像RemoteProc那样,比如:std::shared_ptr testf2(std::shared_ptr a)
  RpcMsg的定义就像unp卷二里描述的那样,只是一个指针,这个地址上的数据有多长,如何解释,完全自定义。

  RpcCS.h

// 所有远程调用的函数应当遵循此接口
// 返回值是一个智能指针,指向一个char[]
// 参数是一个智能指针,指向一个char[]
// 使用智能指针而不是直接使用char*,是因为很多地方都是跨线程传递的,方便在调用中的内存管理
// 不论是返回值还是参数,都在void的基础上自定义结构解析
typedef std::function
<
	std::shared_ptr (std::shared_ptr)
> RemoteProc;

enum RpcNetMsgCmd
{
	RpcProc_NOON = 0	// 若CNetMsgHead::MdCmd为该值,
	// 其余自定义的过程号都应该大于0
};

struct CNetMsgHead			// 这个结构是搬过来说明的,他的定义不放在这里
{
	int MdLen;		// 该包总长度
	int MdCmd;		// 该包执行的操作
	CNetMsgHead()
	{
		MdLen = sizeof(CNetMsgHead);
		MdCmd = -1;				// 该值为-1时默认为心跳包
	}
};

struct RpcMsg :public CNetMsgHead
{
	// CNetMsgHead::MdLen 该成员依旧代表整个包的长度
	// CNetMsgHead::MdCmd 该成员不再代表某个操作,而是直接代表要调用的那个过程号CallNo
	void* MdData;	// 数据指针
					// 在server中,收到该结构MdData表示参数,发送该结构MdData表示返回值
					// 在client中,收到该结构MdData表示返回值,发送该结构MdData表示参数
					// 发送时不设置该成员,而是直接写数据到缓冲区
					// 从缓冲区取出时使用这个成员
	RpcMsg()
	{
		MdData = nullptr;
		MdCmd = 0;
	}
};
2.1RpcServer

  RpcCS.h
    MdCallList:服务器要执行的过程调用,都存放在这里,存储了:
      ①过程号
      ②对应的过程函数地址
      ③该过程返回值的长度
      ④该过程参数的长度
    MdCallListMutex:因为主线程和复写的网络消息处理线程都会访问该列表,这里使用了读写锁。
    MdProcRet:每当有一个过程调用被放进线程池执行,代表其结果的对象就会放进这个表,存储了:
      ①对应客户端连接的CSocketObj*对象
      ②代表执行过程的过程号
      ③可以异步取得执行结果的std::future>
    MdProcRetMutex:MdProcRet是会被异步访问的见,MfCallRetrun
    MfStart:该函数先启动一个线程池,从客户端递送过来的远程调用会被送进该线程池执行
    MfCallRetrun:这是一个线程,它会在MftSart中被丢到线程池中执行,轮询MdProcRet中是否有结果可用,并把结果写回客户端,相当于MdProcRet的消费者。
    MfVNetMsgDisposeFun:是基类提供的网络消息处理函数,当收到消息,会调用这个虚函数来处理,另一个身份是MdProcRet的生产者。

class CRpcServer :private CServiceNoBlock
{
private:
	CThreadPool											MdThreadPool;		// 执行过程时的线程池
	std::map>		MdCallList;			// 注册的过程表,分别描述过程号、过程函数,参数的长度、返回值的长度
	std::shared_mutex									MdCallListMutex;	// 该表的互斥元
	std::map>>>	MdProcRet;		// 过程放入线程池执行时会返回一个std::future,以便后续异步取得该过程的返回值
																									// 分别描述客户端连接、执行的过程号、可以取得返回值的future对象
	std::shared_mutex																MdProcRetMutex; // 过程结果集的锁
																			
public:
	CRpcServer(int HeartBeatTime = 300, int ServiceMaxPeoples = 100, int DisposeThreadNums = 1);
	virtual ~CRpcServer();
	void MfStart(const char* ip, unsigned short port, int threadNums = 3);	// 启动收发线程和线程池,threadNums代表线程池线程数量
	int MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen);		// 注册一个过程
	int MfRemoveCall(int CallNo);											// 移除一个过程
private:
	void MfCallRetrun();
	virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};

  RpcCS.cpp
    这里主要说明MfVNetMsgDisposeFun的思路:
      ①从msg参数中取得要调用的过程号,到MdCallList中寻找,如果找不到对应过程,就给客户端发回一个过程号为RpcProc_NOON(0)的包,表示该过程不存在,然后就没事了。
      ②把可以异步取得结果的future对象放入队列,生产者。

CRpcServer::CRpcServer(int HeartBeatTime, int ServiceMaxPeoples, int DisposeThreadNums) :
	CServiceNoBlock(HeartBeatTime, ServiceMaxPeoples, DisposeThreadNums)
{

}

CRpcServer::~CRpcServer()
{

}

void CRpcServer::MfStart(const char* ip, unsigned short port, int threadNums)
{
	MdThreadPool.MfStart(threadNums+1);						// 线程池启动
	MdThreadPool.MfEnqueue(&CRpcServer::MfCallRetrun, this);// 给对应客户端写回结果的线程丢尽线程池执行
	return CServiceNoBlock::Mf_NoBlock_Start(ip, port);		// 网络收发处理启动
}

int CRpcServer::MfRegCall(int CallNo, RemoteProc Call, int ArgLen, int RetLen)
{
	{
		std::lock_guard write_lock(MdCallListMutex);
		if (MdCallList.find(CallNo) != MdCallList.end())
		{
			printf("RempteProcReg CallNo <%d> already existed!n", CallNo);
			LogFormatMsgAndSubmit(std::this_thread::get_id(), ERROR_FairySun, "RempteProcReg CallNo <%d> already existed!n", CallNo);
			return -1;
		}
		MdCallList[CallNo] = std::make_tuple(Call, ArgLen, RetLen);
	}
	return 0;
}

int CRpcServer::MfRemoveCall(int CallNo)
{
	{
		std::lock_guard write_lock(MdCallListMutex);
		auto it = MdCallList.find(CallNo);
		if (it != MdCallList.end())
			MdCallList.erase(it);
	}
	return 0;
}

void CRpcServer::MfCallRetrun()
{
	while (!MdProcRet.empty() || !MdThreadPool.MfIsStop())
	{
		RpcMsg ret;
		// for循环中 执行完成的远程调用 的CSocketObj*会被加入该队列,结束后统一从MdProcRet中移除
		std::vector removelist;

		// 遍历MdProcRet,如果有一个过程可以取得结果,就把结果发回对应的socket
		for (auto it = MdProcRet.begin(); it != MdProcRet.end(); ++it)
		{
			if (std::get<1>(it->second).valid())		// 如果结果可用
			{
				int procNo = std::get<0>(it->second);
				int procRetSize = std::get<2>(MdCallList[procNo]);
				ret.MdLen = sizeof(CNetMsgHead) + procRetSize;
				ret.MdCmd = procNo;
				std::shared_ptr data = std::get<1>(it->second).get();
				it->first->MfDataToBuffer((char*)&ret, sizeof(CNetMsgHead));	// 先写包头
				it->first->MfDataToBuffer(data.get(), procRetSize);				// 再写数据
				removelist.push_back(it->first);
			}
		}

		// 将执行完成的远程调用统一移除
		{
			std::unique_lock write_lock(MdProcRetMutex);
			for (auto it = removelist.begin(); it != removelist.end(); ++it)
			{
				if (MdProcRet.find(*it) != MdProcRet.end())
					MdProcRet.erase(*it);
			}
		}
				
		std::this_thread::sleep_for(std::chrono::seconds(1));	// 暂停一秒防止执行过快
	}
}

void CRpcServer::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{
	// 注册列表中没有找到对应的过程号,发回一个为0的消息,表示找不到对应过程
	int flag = false;
	RpcMsg ret;
	{
		std::shared_lock read_lock(MdCallListMutex);
		if (MdCallList.find(msg->MdCmd) != MdCallList.end())
			flag = true;
	}
	if (flag == false)
	{
		ret.MdLen = sizeof(RpcMsg);
		ret.MdCmd = RpcProc_NOON;
		cli->MfDataToBuffer((char*)&ret, ret.MdLen);
	}
	else		// 否则就是找到了对应的过程,保存客户端对象和对应的future,放进结果表
	{
		int arglen = std::get<1>(MdCallList[msg->MdCmd]);
		std::shared_ptr buf(new char[arglen]);
		strncpy(buf.get(), ((char*)msg) + sizeof(CNetMsgHead), arglen);
		{
			std::unique_lock write_lock(MdProcRetMutex);
			MdProcRet[cli] = std::make_tuple
			(
				msg->MdCmd,
				MdThreadPool.MfEnqueue(std::get<0>(MdCallList[msg->MdCmd]), buf)
			);
		}
	}
}
2.2RpcClient

  RpcCs.h
    没有任何新的数据成员,只是简单封装了下CClientlinkManage的接口
    MfVNetMsgDisposeFun:基类提供的网络消息处理函数,每收到一条消息就会调用一次,这里虽然复写了它,但是复写成了空函数,它什么都不做,因为客户端的远程过程调用应当是阻塞的。
    MfRemote:使用该函数来发起一个远程过程调用,第二三四参数分别指明了调用过程号、过程需要的数据地址、数据的长度。几乎全部的逻辑都集中在该函数中:
      ①声明一个RpcMsg结构,填写包头结构,然后把包头和数据分别写到对应的套接字里。
      ②循环检查是否有服务端的数据发回。
      ③有数据发回,检查服务器返回的过程号是否正确,不正确和0都会导致该函数返回空指针来表示失败。
      ④正确就从套接字中取出数据,然后写到一个智能指针标识的空间中返回。

class CRpcClient :private CClientlinkManage
{
private:
public:
	CRpcClient();
	virtual ~CRpcClient();
	void MfStart();																					// 启动收发线程
	int MfConnectRpcServer(std::string linkname, const char* ip, unsigned short port);				// 连接Rpc服务
	void MfCloseRpclink(std::string linkname);														// 关闭和Rpc的连接
	std::shared_ptr MfRemote(std::string linkname, int CallNo, void* data, int DataSize);	// 远程过程调用,阻塞等待
private:
	virtual void MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid);
};

  RpcCs.cpp

CRpcClient::CRpcClient():
	CClientlinkManage()
{

}

CRpcClient::~CRpcClient()
{

}

void CRpcClient::MfStart()
{
	CClientlinkManage::MfStart();
}

int CRpcClient::MfConnectRpcServer(std::string linkname, const char* ip, unsigned short port)
{
	return CClientlinkManage::MfCreateAddlink(linkname, ip, port);
}

void CRpcClient::MfCloseRpclink(std::string linkname)
{
	CClientlinkManage::MfCloselink(linkname);
}

std::shared_ptr CRpcClient::MfRemote(std::string linkname, int CallNo, void* data, int DataSize)
{
	RpcMsg msg;
	msg.MdLen = sizeof(CNetMsgHead) + DataSize;
	msg.MdCmd = CallNo;
	CClientlinkManage::MfSendData(linkname, (char*)&msg, sizeof(CNetMsgHead));	// 先写包头
	CClientlinkManage::MfSendData(linkname, (char*)data, DataSize);				// 再写数据	

	while (1)
	{
		if (!CClientlinkManage::MfHasMsg(linkname))		// 如果没有数据
		{
			std::this_thread::sleep_for(std::chrono::seconds(1));
			continue;
		}
		else
		{
			const char * buff = CClientlinkManage::MfGetRecvBufferP(linkname);
			if (  ((RpcMsg*)buff)->MdCmd == RpcProc_NOON  )						// 收到了为0的callno
				return nullptr;
			else if (  ((RpcMsg*)buff)->MdCmd != CallNo  )						// 收到的callno和发出去的callno不一样
				return nullptr;
			else
			{
				int retsize = ((RpcMsg*)buff)->MdLen - sizeof(CNetMsgHead);		// 计算返回的数据长度
				char* ret = new char[retsize];									// 申请空间
				strncpy(ret, ((char*)buff) + sizeof(CNetMsgHead) , retsize);	// 复制到新申请的空间
				CClientlinkManage::MfPopFrontMsg(linkname);						// 缓冲区的消息弹出
				return std::shared_ptr(ret);							// 转换成智能指针返回
			}
		}
	}
}

void CRpcClient::MfVNetMsgDisposeFun(SOCKET sock, CSocketObj* cli, CNetMsgHead* msg, std::thread::id& threadid)
{

}

3.测试

  client.cpp

int main()
{
	FairySunOfNetbaseStart();				// https://blog.csdn.net/qq_43082206/article/details/110383165
	int i = 5;
	CRpcClient c;
	c.MfStart();
	c.MfConnectRpcServer("rpc", "118.31.75.171", 4567);
	for(int i = 0; i < 10; ++i)
		printf("remote ret:%dn", *(int*)c.MfRemote("rpc", 2, (void*)&i, sizeof(int)).get());
	for (int i = 0; i < 3; ++i)
	printf("remote ret:%dn", *(int*)c.MfRemote("rpc", 1, (void*)&i, sizeof(int)).get());
	getchar();
	FairySunOfNetbaseOver();
	return 0;
}

  server.cpp

std::shared_ptr testf1(std::shared_ptr a)
{
	printf("test11111n");
	char* ret = new char[sizeof(int)];
	int s = *((int*)a.get()) + 1;
	*(int*)ret = s;
	std::this_thread::sleep_for(std::chrono::seconds(5));
	return std::shared_ptr(ret);
}
std::shared_ptr testf2(std::shared_ptr a)
{
	printf("test22222n");
	char* ret = new char[sizeof(int)];
	*(int*)ret = *(int*)a.get() + 2;
	return std::shared_ptr(ret);
}

int main()
{
	FairySunOfNetbaseStart();			// https://blog.csdn.net/qq_43082206/article/details/110383165
	CRpcServer s;
	s.MfStart(0, 4567);
	s.MfRegCall(1, testf1, sizeof(int), sizeof(int));
	s.MfRegCall(2, testf2, sizeof(int), sizeof(int));
	s.MfRegCall(2, testf2, sizeof(int), sizeof(int));
	//s.MfRemoveCall(1);
	//s.MfRemoveCall(2);
	getchar();
	FairySunOfNetbaseOver();
	return 0;
}

                                      2021年11月16日16:19:38

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/511217.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号