这2天在预研用C实现向mqtt服务器上发送mqtt消息.
在github上逛了了一圈,看见一个大神写好的demo工程TT3。
学习之后,整理出一个自己的demo工程 test_paho_mqtt_on_vs2017_mfc_console.
正式用的时候,这是个mqtt转发程序(只管发送),通过socket接收其他设备驱动(dll 插件)发来的状态信息,组成第三方约定的格式内容,转发到指定的第三方mqtt服务器上。
demo运行效果 工程下载src_test_paho_mqtt_on_vs2017_mfc_console.zip
笔记 建立远程测试用的mqtt服务器mqtt服务器(mosquitto)测试环境的搭建
建立vs2017MFC控制台工程单独做了试验
vs2017建立MFC控制台工程
github地址 eclipse-paho-mqtt-c
下载以下发布版
eclipse-paho-mqtt-c-win32-1.3.9.zip
eclipse-paho-mqtt-c-win64-1.3.9.zip
我做的demo是x86多字节版本,只需要解压eclipse-paho-mqtt-c-win32-1.3.9.zip到工程,改名为paho_mqtt
添加附加包含目录 .;paho_mqttinclude;
添加附加库目录 .paho_mqttlib
// @file CMyMqtt.h
#include "MQTTClient.h"
#pragma comment(lib, "paho-mqtt3c.lib")
#pragma once
class CMyMqtt
{
public:
CMyMqtt();
virtual ~CMyMqtt();
public:
// mosquitto_pub -h 192.168.1.22 -p 39999 -u user1 -P pwd1 - t 'test/topic' - m "hello mqtt"
bool connect_mqtt_server(
const char* psz_mqtt_server_ip,
const char* psz_mqtt_server_port,
const char* psz_mqtt_user_name,
const char* psz_mqtt_user_pwd,
const char* psz_mqtt_topic);
bool mqtt_msg_sub(const char* psz_mqtt_topic);
bool mqtt_msg_pub(const char* psz_mqtt_topic, const char* psz_mqtt_msg);
bool close_mqtt_connect();
CString myA2W(const char* psz_in);
private:
MQTTClient TTclient;
bool MQTTConnected;
};
#include "pch.h"
#include "CMyMqtt.h"
#include
CMyMqtt::CMyMqtt()
{
MQTTConnected = false;
}
CMyMqtt::~CMyMqtt()
{
}
CString CMyMqtt::myA2W(const char* psz_in)
{
USES_CONVERSION;
CString str_rc;
str_rc = A2W(psz_in);
return str_rc;
}
void delivered(void *context, MQTTClient_deliveryToken dt)
{
// do nothing
}
void connlost(void *context, char *cause)
{
// Not connected
printf("Not connectedn");
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
// 消息到达
char * pBuffer = (char *)malloc(message->payloadlen + 1);
strncpy(pBuffer, (char *)message->payload, message->payloadlen);
*(pBuffer + message->payloadlen) = 0x0;
printf("message in : %s : %sn", topicName, pBuffer);
free(pBuffer);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
bool CMyMqtt::connect_mqtt_server(
const char* psz_mqtt_server_ip,
const char* psz_mqtt_server_port,
const char* psz_mqtt_user_name,
const char* psz_mqtt_user_pwd,
const char* psz_mqtt_topic)
{
int rc = 0;
do {
if (MQTTConnected)
{
break;
}
if ((NULL == psz_mqtt_server_ip)
|| (NULL == psz_mqtt_server_port)
|| (NULL == psz_mqtt_user_name)
|| (NULL == psz_mqtt_user_pwd)
|| (NULL == psz_mqtt_topic))
{
break;
}
if (TTclient) {
MQTTClient_destroy(&TTclient);
}
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
// MQTTClient_willOptions LWT = MQTTClient_willOptions_initializer;
CString cSConnect = psz_mqtt_server_ip;
cSConnect += ":";
cSConnect += psz_mqtt_server_port;
MQTTClient_create(&TTclient, cSConnect.GetBuffer(cSConnect.GetLength()), psz_mqtt_topic, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
if (NULL == TTclient)
{
printf("failed : MQTTClient_create()n");
break;
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 0;
conn_opts.reliable = 0;
conn_opts.connectTimeout = 3;
conn_opts.username = psz_mqtt_user_name;
conn_opts.password = psz_mqtt_user_pwd;
MQTTClient_setCallbacks(TTclient, NULL, connlost, msgarrvd, delivered);
if ((rc = MQTTClient_connect(TTclient, &conn_opts)) == MQTTCLIENT_SUCCESS)
{
printf("mqtt server connect ok!n");
MQTTConnected = true;
}
else {
printf("mqtt server connect failed!n");
}
} while (false);
return MQTTConnected;
}
bool CMyMqtt::mqtt_msg_sub(const char* psz_mqtt_topic)
{
bool b_rc = false;
int QOS = 0;
do {
if (NULL == psz_mqtt_topic) {
break;
}
if (MQTTClient_subscribe(TTclient, psz_mqtt_topic, QOS) == MQTTCLIENT_SUCCESS)
{
b_rc = true;
printf("sub [%s] okn", psz_mqtt_topic);
}
} while (false);
return b_rc;
}
bool CMyMqtt::mqtt_msg_pub(const char* psz_mqtt_topic, const char* psz_mqtt_msg)
{
bool b_rc = false;
int QOS = 0;
int retained = 0;
MQTTClient_deliveryToken dt = 0;
int i_rc = 0;
do {
if ((NULL == psz_mqtt_topic) || (NULL == psz_mqtt_msg))
{
break;
}
i_rc = MQTTClient_publish(TTclient, psz_mqtt_topic, strlen(psz_mqtt_msg), psz_mqtt_msg, QOS, retained, &dt);
if (MQTTCLIENT_SUCCESS == i_rc)
{
b_rc = true;
}
} while (false);
return b_rc;
}
bool CMyMqtt::close_mqtt_connect()
{
bool b_rc = false;
do {
if (MQTTClient_disconnect(TTclient, 2000) == MQTTCLIENT_SUCCESS) {
b_rc = true;
}
MQTTConnected = false;
} while (false);
return b_rc;
}
mqtt测试代码
// test_paho_mqtt_on_vs2017_mfc_console.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include "pch.h"
#include "framework.h"
#include "test_paho_mqtt_on_vs2017_mfc_console.h"
#include "CMyMqtt.h"
#ifdef _DEBUG
#define new DEBUG_NEW
#endif
// 唯一的应用程序对象
CWinApp theApp;
using namespace std;
void my_app();
int main()
{
int nRetCode = 0;
HMODULE hModule = ::GetModuleHandle(nullptr);
if (hModule != nullptr)
{
// 初始化 MFC 并在失败时显示错误
if (!AfxWinInit(hModule, nullptr, ::GetCommandLine(), 0))
{
// TODO: 在此处为应用程序的行为编写代码。
wprintf(L"错误: MFC 初始化失败n");
nRetCode = 1;
}
else
{
// TODO: 在此处为应用程序的行为编写代码。
my_app();
}
}
else
{
// TODO: 更改错误代码以符合需要
wprintf(L"错误: GetModuleHandle 失败n");
nRetCode = 1;
}
return nRetCode;
}
void my_app()
{
bool b_rc = false;
CMyMqtt _mqtt;
CString cs_msg;
int i_tmp = 0;
const char* psz_my_iot_topic = "my/iot";
printf("hello ~ mqtt, connect, sub, pub, disconnect :Pn");
do {
b_rc = _mqtt.connect_mqtt_server(
"192.168.1.103", // 换了服务器之后, 这里要换IP!
"39999",
"user1",
"pwd1",
psz_my_iot_topic
);
if (!b_rc)
{
break;
}
b_rc = _mqtt.mqtt_msg_sub(psz_my_iot_topic);
if (!b_rc)
{
break;
}
do {
cs_msg.Format("test %d", ++i_tmp);
b_rc = _mqtt.mqtt_msg_pub(psz_my_iot_topic, cs_msg);
if (!b_rc) {
printf("mqtt pub failed!n");
break;
}
printf("mqtt pub ok : [%s][%s]n", psz_my_iot_topic, cs_msg.GetBuffer());
} while (i_tmp < 3);
Sleep(10 * 1000); // 发送早已成功, 这里只是看看是否订阅成功
_mqtt.close_mqtt_connect();
} while (false);
printf("app ENDn");
}



