实现需求:
c++项目实现RabbitMQ客户端与服务端交互数据,MES信息传输。
# 1.安装RabbitMQ
此处只表示Windows10下的安装,其他Linux之类的安装请搜寻其他资料。
开发工具
visual studio 2015
cmake 3.21.3工具下载网址没有版本要求
RabbitMQ Windows版本地服务器
#(1)RabbitMQ服务器安装
RabbitMQ是要依赖Erlang的,安装之前先装Erlang。
Erlang下载路径官网
RabbitMQ下载路径官网,因为本人下载的是rabbitmq-server-3.9.7.exe,搭建成功并与能与本地项目通讯,所以推荐可以直接下此版本。如图:此版本上说明了是依赖于Erlang/OTP 23.2链接 和 Erlang 24链接的(以免你们难找所以下载链接都贴上)
上述步骤安装完成后;
就可以在浏览器中登录本地刚搭建的RabbitMQ服务器了,默认网址访问http://localhost:15672 进行测试,默认登录账户为:guest / guest。
注意:如果浏览器打不开网址,可能是默认安装RabbitMQ插件节点没有开启,在windows开始栏找到如图cmd打开
输入 开启RabbitMQ节点 命令
rabbitmqctl start_app
#开启RabbitMQ管理模块的插件,并配置到RabbitMQ节点上
rabbitmq-plugins enable rabbitmq_management
如若配置失败,可能安装出错,卸载RabbitMQ服务端和Erlang之后重新安装(先安装Erlang完成后再安装RabbitMQ)
如果需要关闭RabbitMQ节点,可以使用下面的命令:
rabbitmqctl stop_app
#(2)RabbitMQ-c安装
下载拉取源码:RabbitMQ-c,本地懒得配置git的可以直接下载zip包
打开cmake软件,构建自己的vs工程文件。
我用的是vs2015,win32程序,点击Configure按钮配置
点击完成后点击==Generate,按图取消其值的勾选,生成成功。
如果要修改cmake配置,点击File->Delete cache就行了
然后就是生成我们自己软件调用的DLL了。
点击open Project按钮,或者直接上面的build输出文件夹下打开vs工程,
成功后在生成目录下可以看到我们所需要的DLL文件
至此,所有的RabbitMQ配置已经完成,剩下就是如何在我们的项目中用c++语言构建RabbitMQ客户端与本地搭建的服务器通信了。
本地登录
出现了如上界面证明本地服务器是能正常运行的。
# 2.C++ RabbitMQ客户端
网上还有直接用SimpleAmqpClient的例子,但是搭建起来又要花费好多时间,我这里在搜索了大量资料后直接对RabbitMQ-c重新封装了一层供自己调用。
例子:vs c++创建的win32应用台程序
头文件MainTest.h
#pragma once #include#include #include #include #include #include #include "../RabbitMQClient/RabbitmqClient.h" #include using namespace std; #pragma comment(lib,"RabbitmqClient.lib") enum _RABBITMQ_MESSAGE_TYPE { _JSONSTR_Machine_status = 0, //机台状态 _JSONSTR_Alarm_info, //报警信息传输 _JSONSTR_update_Product_info, //生产信息更新传输 }; class Vm { private: CRabbitmqClient *m_rabbitmq_client; string m_HostIP;//IP地址 string m_Port;//端口号 string m_username;//rabbitmq登录用户名 string m_password;//rabbitmq登录密码 public: Vm();//构造函数 ~Vm();//析构函数 void setConnectInfo(string IP, string port, string username, string password); FILE* m_rabbitFile = NULL;//发送失败文件句柄,发送失败保存当前信息,下一次发送成功时重传 bool m_Rab_Isconnect = false;//是否连接 bool Get_connect_status() { return m_Rab_Isconnect; } bool Rab_Connect();//连接RabbitMQ服务器 bool Rab_DisConnect();//断开连接 string GetExePath();//获取程序执行路径 int Producer_Send(_RABBITMQ_MESSAGE_TYPE type, string message);//发送 int Consumer_Rcv(string duquename, vector & vgetmsg, int getnum = 1);//接收 };
main函数所在文件MainTest.cpp
// MainTest.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include "MainTest.h" #include#include #include string Vm::GetExePath() { //获得路径 string strExePath; char szFilePath[MAX_PATH + 1]; GetModuleFileNameA(NULL, szFilePath, MAX_PATH); (strrchr(szFilePath, _T('\')))[1] = 0; strExePath = szFilePath; return strExePath; } Vm::Vm() { m_rabbitmq_client = CRabbitmqClient::GetInstance(); } Vm::~Vm() { delete m_rabbitmq_client; m_rabbitmq_client = NULL; } //转换成utf8格式的字符串,以免发送中文在Rabbitmq服务器上显示乱码 string string_To_UTF8(const std::string & str) { int nwLen = ::MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, NULL, 0); wchar_t * pwBuf = new wchar_t[nwLen + 1];//一定要加1,不然会出现尾巴 ZeroMemory(pwBuf, nwLen * 2 + 2); ::MultiByteToWideChar(CP_ACP, 0, str.c_str(), str.length(), pwBuf, nwLen); int nLen = ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, NULL, NULL, NULL); char * pBuf = new char[nLen + 1]; ZeroMemory(pBuf, nLen + 1); ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL); std::string retStr(pBuf); delete[]pwBuf; delete[]pBuf; pwBuf = NULL; pBuf = NULL; return retStr; } //转换显示 string UTF8_To_string(const std::string & str) { int nwLen = MultiByteToWideChar(CP_UTF8, 0, str.c_str(), -1, NULL, 0); wchar_t * pwBuf = new wchar_t[nwLen + 1];//一定要加1,不然会出现尾巴 memset(pwBuf, 0, nwLen * 2 + 2); MultiByteToWideChar(CP_UTF8, 0, str.c_str(), str.length(), pwBuf, nwLen); int nLen = WideCharToMultiByte(CP_ACP, 0, pwBuf, -1, NULL, NULL, NULL, NULL); char * pBuf = new char[nLen + 1]; memset(pBuf, 0, nLen + 1); WideCharToMultiByte(CP_ACP, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL); std::string retStr = pBuf; delete[]pBuf; delete[]pwBuf; pBuf = NULL; pwBuf = NULL; return retStr; } void Vm::setConnectInfo(string IP, string port, string username, string password) { m_HostIP = IP;//IP地址 m_Port = port;//端口号 m_username = username;//rabbitmq登录用户名 m_password = password;//rabbitmq登录密码 } bool Vm::Rab_Connect() { if (m_rabbitmq_client) { //设置要连接服务器的ip,端口号,用户名,密码 m_rabbitmq_client->Setconncet_at(m_HostIP, atoi(m_Port.c_str()), m_username, m_password); string err = ""; //连接服务器 if (m_rabbitmq_client->Connect(err) < 0) { cout << "连接RabbitMQ提示:" + err << endl; return false; } cout << "连接RabbitMQ服务器成功" << endl; m_Rab_Isconnect = true; return true; } return false; } bool Vm::Rab_DisConnect() { if (m_rabbitmq_client) { string err = ""; if (m_rabbitmq_client->Disconnect(err) < 0) { cout << "连接RabbitMQ提示:" + err << endl; return false; } cout << "断开连接" << endl; m_Rab_Isconnect = false; return true; } return false; } int Vm::Producer_Send(_RABBITMQ_MESSAGE_TYPE type, string message) { static string tempfile1 = GetExePath() + "\RabproductFailMsg.txt"; static string tempfile2 = GetExePath() + "\RabalarmFailMsg.txt"; static string tempfile3 = GetExePath() + "\RabstatusFailMsg.txt"; string tempfile; string duquename = ""; switch (type) { case _JSONSTR_Machine_status: tempfile = tempfile3; duquename = "Q.EQPT_LOG_D"; break; case _JSONSTR_Alarm_info: tempfile = tempfile2; duquename = "Q.ALARM_D"; break; case _JSONSTR_update_Product_info: tempfile = tempfile1; duquename = "Q.PRODUCT_UPDATE_D"; break; default: break; } #if 1 CQueue queue_temp(duquename); string err = ""; if (m_rabbitmq_client) { if (Get_connect_status()) //连接成功再进行下一步操作 { //声明一个队列 if (m_rabbitmq_client->queue_declare(queue_temp, err) < 0) { cout << "连接RabbitMQ提示:" + err << endl; m_rabbitFile = fopen(tempfile.c_str(), "a+"); string temp = message + "n"; if (m_rabbitFile) { std::fputs(temp.c_str(), m_rabbitFile); std::fflush(m_rabbitFile); std::fclose(m_rabbitFile); } return -1; } else cout << "声明队列成功" << endl; //将序列化之后的二进制消息放到指定路由对应的消息队列中 if (m_rabbitmq_client->publish(message, duquename, err) < 0) { cout << "连接RabbitMQ提示:" + err << endl; m_rabbitFile = fopen(tempfile.c_str(), "a+"); string temp = message + "n"; if (m_rabbitFile) { std::fputs(temp.c_str(), m_rabbitFile); std::fflush(m_rabbitFile); std::fclose(m_rabbitFile); } return -1; } else { cout << "发送内容成功" << endl; if (access(tempfile.c_str(), 0) == 0)//文件存在 { std::ifstream in(tempfile); std::string getmsg; vector vec; while (getline(in, getmsg)) {//逐行读取,遇到换行符 CMessage mg(getmsg); vec.push_back(mg); } in.close(); remove(tempfile.c_str());//删除文件重置 if (vec.size() > 0) { if (m_rabbitmq_client->publish(vec, duquename, err) < 0) { cout << "连接RabbitMQ发送失败信息重发失败:" + err << endl; return -1; } } } } } else { cout << "连接RabbitMQ提示:" + err << endl; m_rabbitFile = fopen(tempfile.c_str(), "a+"); string temp = message + "n"; if (m_rabbitFile) { std::fputs(temp.c_str(), m_rabbitFile); std::fflush(m_rabbitFile); std::fclose(m_rabbitFile); } } //断开连接 //m_rabbitmq_client->__sleep(10); //m_rabbitmq_client->Disconnect(); //return 0; } return 0; #else CRabbitmqClient rb_client(m_HostIP, atoi(m_Port.c_str()), m_username, m_password); //CExchange exchange(exchangename); CQueue queue_temp(duquename); //CRabbitmqClient rb_client("127.0.0.1", 5672, "guest", "guest"); string err = ""; //if (rb_client) { //连接服务器 if (rb_client.Connect(err) < 0) { cout << "连接RabbitMQ提示:" + err << endl; m_rabbitFile = fopen(tempfile.c_str(), "a+"); string temp = message + "n";//方便写入与读取 if (m_rabbitFile) { std::fputs(temp.c_str(), m_rabbitFile); std::fflush(m_rabbitFile); std::fclose(m_rabbitFile); } return -1; } //声明一个交换机 //if (rb_client.exchange_declare(exchange, err) < 0) //{ // return -1; //} //声明一个队列 if (rb_client.queue_declare(queue_temp, err) < 0) { cout << "连接RabbitMQ提示:" + err << endl; m_rabbitFile = fopen(tempfile.c_str(), "a+"); string temp = message + "n"; if (m_rabbitFile) { std::fputs(temp.c_str(), m_rabbitFile); std::fflush(m_rabbitFile); std::fclose(m_rabbitFile); } return -1; } //将交换机绑定到队列, //if (rb_client.queue_bind(queue_temp, exchange, routkeyname, err)<0) //{ // return -1; //} //将序列化之后的二进制消息放到指定路由对应的消息队列中 if (rb_client.publish(message, duquename, err)<0) { cout << "连接RabbitMQ提示:" + err << endl; m_rabbitFile = fopen(tempfile.c_str(), "a+"); string temp = message + "n"; if (m_rabbitFile) { std::fputs(temp.c_str(), m_rabbitFile); std::fflush(m_rabbitFile); std::fclose(m_rabbitFile); } return -1; } else { if (access(tempfile.c_str(), 0) == 0)//文件存在 { std::ifstream in(tempfile); std::string getmsg; vector vec; while (getline(in, getmsg)) {//逐行读取,遇到换行符 CMessage mg(getmsg); vec.push_back(mg); } in.close(); remove(tempfile.c_str());//删除文件重置 if (vec.size() > 0) { if (rb_client.publish(vec, duquename, err) < 0) { cout << "连接RabbitMQ发送失败信息重发失败:" + err << endl; return -1; } } } } //断开连接 rb_client.__sleep(10); rb_client.Disconnect(); return 0; } #endif } int Vm::Consumer_Rcv(string duquename, vector & vgetmsg, int getnum) { // CRabbitmqClient rb_client("127.0.0.1", 5672, "guest", "guest"); CRabbitmqClient rb_client(m_HostIP, atoi(m_Port.c_str()), m_username, m_password); //CExchange exchange(exchangename); //CQueue queue_temp(duquename); string err = ""; ::timeval tvb = { 1,10 }; //if (m_rabbitmq_client) { //连接服务器 if (rb_client.Connect(err) < 0) { cout << "连接RabbitMQ提示:" + err << endl; return -1; } cout << "连接RabbitMQ服务器成功" << endl; vgetmsg.clear(); //接收 if (rb_client.consumer(duquename, vgetmsg, getnum, &tvb, err)<0) { cout << "连接RabbitMQ提示:" + err << endl; return -1; } rb_client.__sleep(10); rb_client.Disconnect(); return 0; } } int main() { Vm Interface; Interface.setConnectInfo("localhost", "5672", "guest", "guest");//注:本地网页默认登录端口是15672,监听的端口号为5672 Interface.Rab_Connect(); string temp = "测试机台信息发送"; Interface.Producer_Send(_JSONSTR_Machine_status, string_To_UTF8(temp)); Sleep(1000); Interface.Rab_DisConnect(); vector msg; msg.clear(); Interface.Consumer_Rcv("Q.EQPT_LOG_D",msg);//从刚刚的队列中取消息 if(msg.size()>0) cout << "连接RabbitMQ接收到的信息为:" + UTF8_To_string(msg[0]) << endl; int a = 0; scanf("%d", &a); return 0; }
配置好属性后,编译通过,运行如下:
想在rabbitMQ服务端能看到,main函数程序中注释掉
int main()
{
Vm Interface;
Interface.setConnectInfo("localhost", "5672", "guest", "guest");//注:本地网页默认登录端口是15672,监听的端口号为5672
Interface.Rab_Connect();
string temp = "测试机台信息发送";
Interface.Producer_Send(_JSONSTR_Machine_status, string_To_UTF8(temp));
Sleep(1000);
Interface.Rab_DisConnect();
//vector msg;
//msg.clear();
//Interface.Consumer_Rcv("Q.EQPT_LOG_D",msg);//从刚刚的队列中取消息
//if(msg.size()>0)
// cout << "连接RabbitMQ接收到的信息为:" + UTF8_To_string(msg[0]) << endl;
int a = 0;
scanf("%d", &a);
return 0;
}
点击Q.EQPT_LOG_D,查看刚刚创建的队列具体里面的具体内容
下拉点击Get Message就能显示刚刚发送的消息了。
好了,完成了。
3.结语
此文档只有调用程序,没有RabbitMQ c++封装的代码,由于本人也花费了很多精力,此封装程序暂没公开。程序在我主界面资源的下载,保证下载直接就能编译运行使用到各位的c++项目中,如实在不想用积分,请私信。



