栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

open62541 基于mqtt订阅发布

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

open62541 基于mqtt订阅发布

概述

在上篇文章open62541 发布订阅(基于UDP)中主要讲述无代理情况下的订阅/发布实现,本篇主要是讲述如何实现基于mqtt实现的订阅/发布。相关mqtt的知识可以自行百度查看,这里不过多说明,只是将实现功能所需步骤进行讲述。下面将会介绍一下windows下mqtt安装以及使用。

windows下mosquitto使用

1、下载mosquitto
链接:https://mosquitto.org/download/
2、安装
安装完毕后进入安装目录,如下:

3、配置

  • 关闭匿名登录
    默认情况下mosquitto是开启了匿名登录的,我们可以根据自己需求对其进行开关,配置文件在软件安装目录下名为mosquitto.conf,我们将其注释的部分进行修改如下,并且添加密码文件路径

这样我们就关闭了匿名登录,在启动mosquitto时通过添加参数-v -c mosquitto.conf就可以进行配置。

  • 添加用户名和密码
    使用命令mosquitto_passwd.exe -c pwfile.examle username,如下添加mqtt用户

    连续输入两次密码后即添加成功,-c参数是以追加方式进行添加用户,不会删除原有的用户。添加的用户信息在pwfile.example文件中存储,如下

    这样我们添加用户就成功了,使用MQTTX等软件进行连接时需要使用该账号密码。

4、使用
使用之前需要运行mosquitto.exe,在安装目录中打开命令行

输入cmd,回车,在命令行中输入mosquitto.exe -v -c mosquitto.conf。

这样mosquitto就启动成功了,我们可以使用sub和pub功能。

默认端口号:1883
默认Borker Address:127.0.0.1

这里以主题名称为topic1进行说明

订阅端:

mosquitto_sub.exe -t topic1 -u mqtt -P 123456

发布端:

mosquitto_pub.exe -t topic1 -m “hello world” -u mqtt -P 123456

运行结果如下:

本地测试,后面用户名和密码可以省略。

MQTTX使用

1、概述
MQTTX是一款专业的开源跨平台MQTT 5.0客户端工具,MQTTX软件支持macOS, Linux, Windows,并且还可以支持MQTT消息格式转换,通过MQTTX软件用户能够快速创建连接保存并同时建立多个连接客户端。接下来简述一下其使用。

2、安装
下载地址:https://mqttx.app/

3、使用

  • 连接
    打开MQTTX软件后,新建连接

    弹出的窗口中我们设定相对应的信息,

    之后进行连接

    这样就连接成功了,我们可以添加订阅topic或者发布信息到topic中。添加订阅如下:

    设定好我们要订阅的top名称,这里使用testtopic,我们使用mosquitto_pub.exe去像testtopic主题发布一个信息

    可见,MQTTX客户端成功获取到对应信息。发布的话只要指定发布topic和设置内容即可,

    这样内容就发布出去了,我们使用mosquitto_sub.exe去订阅对应testtopic主题也能对应获取到,这里就不进行展示了。

注意事项:
以上操作都要将mosquitto.exe运行起来,否则不管是连接还是发布都无法使用,为了省去这一步麻烦操作我们可以去windows中启动mosquitto服务

正常都是能够启动的,有时候重新安装后与原路径不一致,注册表未清空就会碰到下面的问题

这个时候我们只需去找到其注册表将其路径改为现在安装mosquitto的路径即可解决。在关闭匿名登录时需要添加 -v -c ${mosquittoDir}mosquitto.conf参数,其中mosquittoDir是安装目录。

windows下配置pthread

由于在open62541中有关mqtt发布的接口涉及到pthread相关接口,因此我们需要配置一下pthread。
1、下载库
链接:http://sourceware.org/pub/pthreads-win32/pthreads-w32-2-9-1-release.zip
2、将库配置到vs中
将下载好的库解压,如下

其中我们需要的是Pre-built.2这个文件夹下的文件,dll、include、和lib

将include中的.h文件全部拷贝至VS安装目录下VC文件夹中include目录下

例如我这里路径:C:Program Files (x86)Microsoft Visual Studio 12.0VCinclude

将lib文件夹中x64和x86文件夹拷贝至VS安装目录下VC文件夹中lib目录下

例如我这里路径:C:Program Files (x86)Microsoft Visual Studio 12.0VClib

将dll文件夹下x64文件夹中所有dll拷贝至C:WindowsSystem32(64位运行库),将x86文件夹中所有dll拷贝至C:WindowsSysWOW64(32位运行库)

这样基本的环境就配置完成了,接下来开始步入正题,使用代码去实现基于mqtt的发布功能。

基于mqtt发布
  • 编译open62541
    勾选UA_AMALGAMATION、UA_ENABLE_PUBSUB、UA_ENABLE_JSON_ENCODING、UA_ENABLE_PUBSUB_MQTT四个选项


点击Generate后将生成的工程进行编译生成open62541.h和open62541.c

  • 基于本地mosquitto发布
    这里以open62541中自带例程tutorial_pubsub_mqtt_publish.c进行叙述,代码如下
#include "open62541.h"
#include "ua_network_pubsub_mqtt.h"
#include 

#pragma comment(lib, "x86/pthreadVC2.lib")

#define CONNECTION_NAME              "MQTT Publisher Connection"
#define TRANSPORT_PROFILE_URI        "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt"
#define MQTT_CLIENT_ID               "TESTCLIENTPUBSUBMQTT"
#define CONNECTIONOPTION_NAME        "mqttClientId"
#define PUBLISHER_TOPIC              "customTopic"
#define PUBLISHER_METADATAQUEUENAME  "MetaDataTopic"
#define PUBLISHER_METADATAUPDATETIME 0
#define BROKER_ADDRESS_URL           "opc.mqtt://127.0.0.1:1883"
#define PUBLISH_INTERVAL             500

// Uncomment the following line to enable MQTT login for the example
// #define EXAMPLE_USE_MQTT_LOGIN

#ifdef EXAMPLE_USE_MQTT_LOGIN
#define USERNAME_OPTION_NAME         "mqttUsername"
#define PASSWORD_OPTION_NAME         "mqttPassword"
#define MQTT_USERNAME                "open62541user"
#define MQTT_PASSWORD                "open62541"
#endif

// Uncomment the following line to enable MQTT via TLS for the example
// #define EXAMPLE_USE_MQTT_TLS

#ifdef EXAMPLE_USE_MQTT_TLS
#define USE_TLS_OPTION_NAME             "mqttUseTLS"
#define MQTT_CA_FILE_PATH_OPTION_NAME   "mqttCaFilePath"
#define CA_FILE_PATH                    "/path/to/server.cert"
#endif

#ifdef UA_ENABLE_JSON_ENCODING
static UA_Boolean useJson = true;
#else
static UA_Boolean useJson = false;
#endif

static UA_NodeId connectionIdent;
static UA_NodeId publishedDataSetIdent;
static UA_NodeId writerGroupIdent;

static void
addPubSubConnection(UA_Server *server, char *addressUrl) {
    
    UA_PubSubConnectionConfig connectionConfig;
    memset(&connectionConfig, 0, sizeof(connectionConfig));
    connectionConfig.name = UA_STRING(CONNECTION_NAME);
    connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI);
    connectionConfig.enabled = UA_TRUE;

    
    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
    
    connectionConfig.publisherId.numeric = 2234;

    
    const int connectionOptionsCount = 1
#ifdef EXAMPLE_USE_MQTT_LOGIN
    + 2
#endif
#ifdef EXAMPLE_USE_MQTT_TLS
    + 2
#endif
    ;

    UA_KeyValuePair connectionOptions[1];

    size_t connectionOptionIndex = 0;
    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
    UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);

#ifdef EXAMPLE_USE_MQTT_LOGIN
    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, USERNAME_OPTION_NAME);
    UA_String mqttUsername = UA_STRING(MQTT_USERNAME);
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttUsername, &UA_TYPES[UA_TYPES_STRING]);

    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, PASSWORD_OPTION_NAME);
    UA_String mqttPassword = UA_STRING(MQTT_PASSWORD);
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttPassword, &UA_TYPES[UA_TYPES_STRING]);
#endif

#ifdef EXAMPLE_USE_MQTT_TLS
    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, USE_TLS_OPTION_NAME);
    UA_Boolean mqttUseTLS = true;
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttUseTLS, &UA_TYPES[UA_TYPES_BOOLEAN]);

    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, MQTT_CA_FILE_PATH_OPTION_NAME);
    UA_String mqttCaFile = UA_STRING(CA_FILE_PATH);
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttCaFile, &UA_TYPES[UA_TYPES_STRING]);
#endif

    connectionConfig.connectionProperties = connectionOptions;
    connectionConfig.connectionPropertiesSize = connectionOptionIndex;

    UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}


static void
addPublishedDataSet(UA_Server *server) {
    
    UA_PublishedDataSetConfig publishedDataSetConfig;
    memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
    publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
    publishedDataSetConfig.name = UA_STRING("Demo PDS");
    
    UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
}


static void
addDataSetField(UA_Server *server) {
    
    UA_DataSetFieldConfig dataSetFieldConfig;
    memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
    dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;

    dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
    dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
    dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
        UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
    dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
    UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, NULL);
}


static UA_StatusCode
addWriterGroup(UA_Server *server, char *topic, int interval) {
    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    
    UA_WriterGroupConfig writerGroupConfig;
    memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
    writerGroupConfig.name = UA_STRING("Demo WriterGroup");
    writerGroupConfig.publishingInterval = interval;
    writerGroupConfig.enabled = UA_FALSE;
    writerGroupConfig.writerGroupId = 100;
    UA_UadpWriterGroupMessageDataType *writerGroupMessage;

    
#ifdef UA_ENABLE_JSON_ENCODING
    UA_JsonWriterGroupMessageDataType *Json_writerGroupMessage;
    
    if(useJson) {
        writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
        writerGroupConfig.messageSettings.encoding             = UA_EXTENSIONOBJECT_DECODED;

        writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONWRITERGROUPMESSAGEDATATYPE];
        
        Json_writerGroupMessage = UA_JsonWriterGroupMessageDataType_new();
        
        Json_writerGroupMessage->networkMessageContentMask =
            (UA_JsonNetworkMessageContentMask)(UA_JSONNETWORKMESSAGECONTENTMASK_NETWORKMESSAGEHEADER |
            (UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_DATASETMESSAGEHEADER |
            (UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_SINGLEDATASETMESSAGE |
            (UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_PUBLISHERID |
            (UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_DATASETCLASSID);
        writerGroupConfig.messageSettings.content.decoded.data = Json_writerGroupMessage;
    }

    else
#endif
    {
        writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
        writerGroupConfig.messageSettings.encoding             = UA_EXTENSIONOBJECT_DECODED;
        writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
        
        writerGroupMessage  = UA_UadpWriterGroupMessageDataType_new();
        
        writerGroupMessage->networkMessageContentMask =
            (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
            (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
            (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
            (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
        writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
    }


    
    UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
    
    
    brokerTransportSettings.queueName = UA_STRING(topic);
    brokerTransportSettings.resourceUri = UA_STRING_NULL;
    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;

    
    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;

    
    UA_ExtensionObject transportSettings;
    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
    transportSettings.content.decoded.data = &brokerTransportSettings;

    writerGroupConfig.transportSettings = transportSettings;
    retval = UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);

    if (retval == UA_STATUSCODE_GOOD)
        UA_Server_setWriterGroupOperational(server, writerGroupIdent);

#ifdef UA_ENABLE_JSON_ENCODING
    if (useJson) {
        UA_JsonWriterGroupMessageDataType_delete(Json_writerGroupMessage);
    }
#endif

    if (!useJson && writerGroupMessage) {
        UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
    }

    return retval;
}


static void
addDataSetWriter(UA_Server *server, char *topic) {
    
    UA_NodeId dataSetWriterIdent;
    UA_DataSetWriterConfig dataSetWriterConfig;
    memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
    dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
    dataSetWriterConfig.dataSetWriterId = 62541;
    dataSetWriterConfig.keyFrameCount = 10;

#ifdef UA_ENABLE_JSON_ENCODING
    UA_JsonDataSetWriterMessageDataType jsonDswMd;
    UA_ExtensionObject messageSettings;
    if(useJson) {
        
        jsonDswMd.dataSetMessageContentMask = (UA_JsonDataSetMessageContentMask)
            (UA_JSONDATASETMESSAGECONTENTMASK_DATASETWRITERID |
             UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER |
             UA_JSONDATASETMESSAGECONTENTMASK_STATUS |
             UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION |
             UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP);

        messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
        messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE];
        messageSettings.content.decoded.data = &jsonDswMd;

        dataSetWriterConfig.messageSettings = messageSettings;
    }
#endif
    
    
    
    
    UA_BrokerDataSetWriterTransportDataType brokerTransportSettings;
    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerDataSetWriterTransportDataType));

    
    brokerTransportSettings.queueName = UA_STRING(topic);
    brokerTransportSettings.resourceUri = UA_STRING_NULL;
    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
    brokerTransportSettings.metaDataQueueName = UA_STRING(PUBLISHER_METADATAQUEUENAME);
    brokerTransportSettings.metaDataUpdateTime = PUBLISHER_METADATAUPDATETIME;

    
    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;

    
    UA_ExtensionObject transportSettings;
    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETWRITERTRANSPORTDATATYPE];
    transportSettings.content.decoded.data = &brokerTransportSettings;

    dataSetWriterConfig.transportSettings = transportSettings;
    UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
                               &dataSetWriterConfig, &dataSetWriterIdent);
}


UA_Boolean running = true;
static void stopHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
    running = false;
}

static void usage(void) {
    printf("Usage: tutorial_pubsub_mqtt_publish [--url ] "
           "[--topic ] "
           "[--freq  "
           "[--json]n"
           "  Defaults are:n"
           "  - Url: opc.mqtt://127.0.0.1:1883n"
           "  - Topic: customTopicn"
           "  - Frequency: 500n"
           "  - JSON: Offn");
}

int main(int argc, char **argv) {
    signal(SIGINT, stopHandler);
    signal(SIGTERM, stopHandler);

    
    char *addressUrl = BROKER_ADDRESS_URL;
    char *topic = PUBLISHER_TOPIC;
    int interval = PUBLISH_INTERVAL;

    
    for(int argpos = 1; argpos < argc; argpos++) {
        if(strcmp(argv[argpos], "--help") == 0) {
            usage();
            return 0;
        }

        if(strcmp(argv[argpos], "--json") == 0) {
            useJson = true;
            continue;
        }

        if(strcmp(argv[argpos], "--url") == 0) {
            if(argpos + 1 == argc) {
                usage();
                return -1;
            }
            argpos++;
            addressUrl = argv[argpos];
            continue;
        }

        if(strcmp(argv[argpos], "--topic") == 0) {
            if(argpos + 1 == argc) {
                usage();
                return -1;
            }
            argpos++;
            topic = argv[argpos];
            continue;
        }

        if(strcmp(argv[argpos], "--freq") == 0) {
            if(argpos + 1 == argc) {
                usage();
                return -1;
            }
            if(sscanf(argv[argpos], "%d", &interval) != 1) {
                usage();
                return -1;
            }
            if(interval <= 10) {
                UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                               "Publication interval too small");
                return -1;
            }
            continue;
        }

        usage();
        return -1;
    }

    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    
    UA_Server *server = UA_Server_new();
    UA_ServerConfig *config = UA_Server_getConfig(server);
    
    UA_ServerConfig_setDefault(config);
    UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerMQTT());

    addPubSubConnection(server, addressUrl);
    addPublishedDataSet(server);
    addDataSetField(server);
    retval = addWriterGroup(server, topic, interval);
    if (UA_STATUSCODE_GOOD != retval)
    {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Error Name = %s", UA_StatusCode_name(retval));
        return EXIT_FAILURE;
    }
    addDataSetWriter(server, topic);
    //UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, connectionIdent);

    //if(!connection) {
    //    UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
    //                   "Could not create a PubSubConnection");
    //    UA_Server_delete(server);
    //    return -1;
    //}

    UA_Server_run(server, &running);
    UA_Server_delete(server);
    return 0;
}

这里对其代码没做什么修改,只是引入pthread库以及将UA_PubSubConnection_findConnectionbyId(server, connectionIdent);部分进行了注释,因为其实现是在open62541.c中,不进行注释的话会编译不通过。这里注释的话影响也不大,也可以通过addPubSubConnection(server, addressUrl);中UA_Server_addPubSubConnection接口返回值进行判断。流程与基于UDP的订阅/发布基本一致,可以参考open62541 发布订阅(基于UDP)一文。这里主要叙述编译所需要的一些文件以及修改。
这里需要的文件比较多,不是直接加入.h和.c就能直接编译运行,这里看一下工程目录

这里我们需要引入mqtt.c,ua_mqtt_adapter.c,ua_mqtt_pal.c和ua_network_pubsub_mqtt.c一起进行编译,对应的mqtt.h,ua_mqtt_adapter.h,ua_mqtt_pal.h和ua_network_pubsub_mqtt.h在open62541中都能够找到。我们将需要的.h都放入工程目录下,然后将对应的文件中包含的头文件进行修改,

其他.h和.c中的也是类型修改,这里就不再多说。进行编译时mqtt.c会进行报错如下,

这里可能是编译选项的原因,因为我们没有使用TLS,有些参数没有定义,所以可以将if(useTLS)这段语句块进行注释,重新编译后就能通过了

这个工程是向地址127.0.0.1端口号为1883的地址发布服务器的系统时间,其中主题为customTopic,我们可以运行然后使用MQTTX软件查看效果

可以看到我们使用MQTTX软件和mosquitto_sub.exe去订阅customTopic主题内容,都能够正常获取到发布的系统时间。

注意:这里没有使用账号密码去连接,因此mosquitto需要打开匿名登录,不然会导致连接不上的问题

  • 发布至阿里云物联网平台
    有关如何搭建阿里云MQTT,可以参考阿里云MQTT使用教程,例程tutorial_pubsub_mqtt_publish.c中默认是使用匿名登录的,我们需要将其打开,然后将连接mqtt的必要参数进行修改即可,修改部分如下,
...........
#define CONNECTION_NAME              "MQTT Publisher Connection"
#define TRANSPORT_PROFILE_URI        "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt"
#define MQTT_CLIENT_ID               "001|securemode=3,signmethod=hmacsha1|"//"TESTCLIENTPUBSUBMQTT"
#define CONNECTIONOPTION_NAME        "mqttClientId"
#define PUBLISHER_TOPIC              "/a1XDD7aoXtS/mydevice/user/mytopic"//"customTopic"
#define PUBLISHER_METADATAQUEUENAME  "MetaDataTopic"
#define PUBLISHER_METADATAUPDATETIME 0
#define BROKER_ADDRESS_URL           "opc.mqtt://a1737QwbwMC.iot-as-mqtt.cn-shanghai.aliyuncs.com:1883"//"opc.mqtt://127.0.0.1:1883"    
#define PUBLISH_INTERVAL             1000

// Uncomment the following line to enable MQTT login for the example
#define EXAMPLE_USE_MQTT_LOGIN

#ifdef EXAMPLE_USE_MQTT_LOGIN
#define USERNAME_OPTION_NAME         "mqttUsername"
#define PASSWORD_OPTION_NAME         "mqttPassword"
#define MQTT_USERNAME                "mydevice&a1XDD7aoXtS"//"mqtt"//
#define MQTT_PASSWORD                "8c22a9d36c6bedf4e01739c4a74c82248847a649"//"123456"//
#endif
...........

与MQTTX软件连接的参数一致,然后运行

可以看到以及在向/a1XDD7aoXtS/mydevice/user/mytopic这个topic发布信息,这个时候我们可以打开阿里云中设备查看日志,

可见我们的系统时间已经成功发布了。

注意:这个部分数组要改为3,不然会导致越界

结尾

这部分相比基于UDP订阅/发布来说主要就是环境配置比较麻烦,其他的操作都类似。注意编译open62541时UA_ENABLE_JSON_ENCODING这个选项一定要勾选,不然发布的信息在MQTTX中订阅会显示乱码。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/828505.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号