I am trying to get the open62541 MQTT publish/subscribe mechanism to work. I tried using the provided examples (tutorial_pubsub_mqtt_publish.c
and tutorial_pubsub_mqtt_subscribe.c
) as a baseline, but the subscriber example seems to have issues. At the moment, I am able to send messages, but the messages do not seem to update the subscriber-side nodes upon receiving them. I am running the two applications locally in combination with the mosquitto MQTT broker. The publisher side code is taken from the aforementioned example code file, while the subscriber code was slightly modified:
Subscriber side code
#include <open62541/common.h>
#include <open62541/plugin/log.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/plugin/nodestore.h>
#include <open62541/server.h>
#include <open62541/server_pubsub.h>
#include <open62541/plugin/securitypolicy_default.h>
#include <open62541/types.h>
#include <stdio.h>
#define CONNECTION_NAME "MQTT Subscriber Connection"
#define TRANSPORT_PROFILE_URI_UADP "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp"
#define TRANSPORT_PROFILE_URI_JSON "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-json"
#define MQTT_CLIENT_ID "TESTCLIENTPUBSUBMQTTSUBSCRIBE"
#define CONNECTIONOPTION_NAME "mqttClientId"
#define SUBSCRIBER_TOPIC "customTopic"
#define SUBSCRIBER_METADATAQUEUENAME "MetaDataTopic"
#define SUBSCRIBER_METADATAUPDATETIME 0
#define BROKER_ADDRESS_URL "opc.mqtt://127.0.0.1:1883"
static UA_Boolean useJson = false;
UA_NodeId connectionIdent;
UA_NodeId readerGroupIdent;
UA_NodeId readerIdent;
UA_DataSetReaderConfig readerConfig;
static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData);
static UA_StatusCode
addPubSubConnection(UA_Server *server, char *addressUrl) {
UA_StatusCode retval = UA_STATUSCODE_GOOD;
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
connectionConfig.name = UA_STRING(CONNECTION_NAME);
if (useJson) {
connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI_JSON);
} else {
connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI_UADP);
}
connectionConfig.enabled = UA_TRUE;
/* configure address of the mqtt broker (local on default port) */
UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherId.idType = UA_PUBLISHERIDTYPE_UINT16;
connectionConfig.publisherId.id.uint16 = 2234;
UA_KeyValuePair connectionOptions[1];
UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
connectionOptions[0].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
UA_Variant_setScalar(&connectionOptions[0].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);
connectionConfig.connectionProperties.map = connectionOptions;
connectionConfig.connectionProperties.mapSize = 1;
retval |= UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
return retval;
}
/* Add ReaderGroup to the created connection */
static UA_StatusCode
addReaderGroup(UA_Server *server) {
if(server == NULL) {
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
UA_ReaderGroupConfig readerGroupConfig;
memset(&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig));
readerGroupConfig.name = UA_STRING("ReaderGroup1");
if(useJson)
readerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
/* configure the mqtt publish topic */
UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
/* Assign the Topic at which MQTT publish should happen */
brokerTransportSettings.queueName = UA_STRING(SUBSCRIBER_TOPIC);
brokerTransportSettings.resourceUri = UA_STRING_NULL;
brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
/* Choose the QOS Level for MQTT */
brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
/* Encapsulate config in transportSettings */
UA_ExtensionObject transportSettings;
memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETREADERTRANSPORTDATATYPE];
transportSettings.content.decoded.data = &brokerTransportSettings;
readerGroupConfig.transportSettings = transportSettings;
retval |= UA_Server_addReaderGroup(server, connectionIdent, &readerGroupConfig,
&readerGroupIdent);
UA_Server_enableReaderGroup(server, readerGroupIdent);
return retval;
}
/* Add DataSetReader to the ReaderGroup */
static UA_StatusCode
addDataSetReader(UA_Server *server) {
if(server == NULL) {
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
memset(&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
readerConfig.name = UA_STRING("DataSet Reader 1");
/* Parameters to filter which DataSetMessage has to be processed
* by the DataSetReader */
/* The following parameters are used to show that the data published by
* tutorial_pubsub_mqtt_publish.c is being subscribed and is being updated in
* the information model */
UA_UInt16 publisherIdentifier = 2234;
readerConfig.publisherId.idType = UA_PUBLISHERIDTYPE_UINT16;
readerConfig.publisherId.id.uint16 = publisherIdentifier;
readerConfig.writerGroupId = 100;
readerConfig.dataSetWriterId = 62541;
#ifdef UA_ENABLE_PUBSUB_MONITORING
readerConfig.messageReceiveTimeout = 10;
#endif
/* Setting up Meta data configuration in DataSetReader */
fillTestDataSetMetaData(&readerConfig.dataSetMetaData);
retval |= UA_Server_addDataSetReader(server, readerGroupIdent, &readerConfig,
&readerIdent);
UA_Server_enableDataSetReader(server, readerIdent);
return retval;
}
static UA_StatusCode
addSubscribedVariables (UA_Server *server, UA_NodeId dataSetReaderId) {
if(server == NULL)
return UA_STATUSCODE_BADINTERNALERROR;
UA_StatusCode retval = UA_STATUSCODE_GOOD;
UA_NodeId folderId;
UA_String folderName = readerConfig.dataSetMetaData.name;
UA_ObjectAttributes oAttr = UA_ObjectAttributes_default;
UA_QualifiedName folderBrowseName;
if(folderName.length > 0) {
oAttr.displayName.locale = UA_STRING ("en-US");
oAttr.displayName.text = folderName;
folderBrowseName.namespaceIndex = 1;
folderBrowseName.name = folderName;
}
else {
oAttr.displayName = UA_LOCALIZEDTEXT ("en-US", "Subscribed Variables");
folderBrowseName = UA_QUALIFIEDNAME (1, "Subscribed Variables");
}
UA_Server_addObjectNode(server, UA_NODEID_NULL, UA_NS0ID(OBJECTSFOLDER),
UA_NS0ID(ORGANIZES), folderBrowseName,
UA_NS0ID(BASEOBJECTTYPE), oAttr, NULL, &folderId);
/**
* **TargetVariables**
*
* The SubscribedDataSet option TargetVariables defines a list of Variable mappings between
* received DataSet fields and target Variables in the Subscriber AddressSpace.
* The values subscribed from the Publisher are updated in the value field of these variables */
/* Create the TargetVariables with respect to DataSetMetaData fields */
UA_FieldTargetVariable *targetVars = (UA_FieldTargetVariable *)
UA_calloc(readerConfig.dataSetMetaData.fieldsSize, sizeof(UA_FieldTargetVariable));
for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++) {
/* Variable to subscribe data */
UA_VariableAttributes vAttr = UA_VariableAttributes_default;
UA_LocalizedText_copy(&readerConfig.dataSetMetaData.fields[i].description,
&vAttr.description);
vAttr.displayName.locale = UA_STRING("en-US");
vAttr.displayName.text = readerConfig.dataSetMetaData.fields[i].name;
vAttr.dataType = readerConfig.dataSetMetaData.fields[i].dataType;
UA_NodeId newNode;
retval |= UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, (UA_UInt32)i + 50000),
folderId, UA_NS0ID(HASCOMPONENT),
UA_QUALIFIEDNAME(1, (char *)readerConfig.dataSetMetaData.fields[i].name.data),
UA_NS0ID(BASEDATAVARIABLETYPE),
vAttr, NULL, &newNode);
/* For creating Targetvariables */
UA_FieldTargetDataType_init(&targetVars[i].targetVariable);
targetVars[i].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE;
targetVars[i].targetVariable.targetNodeId = newNode;
}
retval = UA_Server_DataSetReader_createTargetVariables(server, dataSetReaderId,
readerConfig.dataSetMetaData.fieldsSize, targetVars);
for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++)
UA_FieldTargetDataType_clear(&targetVars[i].targetVariable);
UA_free(targetVars);
UA_free(readerConfig.dataSetMetaData.fields);
return retval;
}
/**
* **DataSetMetaData**
*
* The DataSetMetaData describes the content of a DataSet. It provides the information necessary to decode
* DataSetMessages on the Subscriber side. DataSetMessages received from the Publisher are decoded into
* DataSet and each field is updated in the Subscriber based on datatype match of TargetVariable fields of Subscriber
* and PublishedDataSetFields of Publisher */
/* Define MetaData for TargetVariables */
static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData) {
if(pMetaData == NULL) {
return;
}
UA_DataSetMetaDataType_init (pMetaData);
pMetaData->name = UA_STRING ("DataSet 1");
/* Static definition of number of fields size to 4 to create four different
* targetVariables of distinct datatype
* Currently the publisher sends only DateTime data type */
pMetaData->fieldsSize = 4;
pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize,
&UA_TYPES[UA_TYPES_FIELDMETADATA]);
/* DateTime DataType */
UA_FieldMetaData_init (&pMetaData->fields[0]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_DATETIME].typeId,
&pMetaData->fields[0].dataType);
pMetaData->fields[0].builtInType = UA_NS0ID_DATETIME;
pMetaData->fields[0].name = UA_STRING ("DateTime");
pMetaData->fields[0].valueRank = -1; /* scalar */
/* Int32 DataType */
UA_FieldMetaData_init (&pMetaData->fields[1]);
UA_NodeId_copy(&UA_TYPES[UA_TYPES_INT32].typeId,
&pMetaData->fields[1].dataType);
pMetaData->fields[1].builtInType = UA_NS0ID_INT32;
pMetaData->fields[1].name = UA_STRING ("Int32");
pMetaData->fields[1].valueRank = -1; /* scalar */
/* Int64 DataType */
UA_FieldMetaData_init (&pMetaData->fields[2]);
UA_NodeId_copy(&UA_TYPES[UA_TYPES_INT64].typeId,
&pMetaData->fields[2].dataType);
pMetaData->fields[2].builtInType = UA_NS0ID_INT64;
pMetaData->fields[2].name = UA_STRING ("Int64");
pMetaData->fields[2].valueRank = -1; /* scalar */
/* Boolean DataType */
UA_FieldMetaData_init (&pMetaData->fields[3]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_BOOLEAN].typeId,
&pMetaData->fields[3].dataType);
pMetaData->fields[3].builtInType = UA_NS0ID_BOOLEAN;
pMetaData->fields[3].name = UA_STRING ("BoolToggle");
pMetaData->fields[3].valueRank = -1; /* scalar */
}
static void onRead(UA_Server *server,
const UA_NodeId *sessionId, void *sessionContext,
const UA_NodeId *nodeid, void *nodeContext,
const UA_NumericRange *range, const UA_DataValue *data)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Variable read");
}
static void onWrite(UA_Server *server,
const UA_NodeId *sessionId, void *sessionContext,
const UA_NodeId *nodeId, void *nodeContext,
const UA_NumericRange *range, const UA_DataValue *data)
{
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Variable write");
}
static void usage(void) {
printf("Usage: tutorial_pubsub_mqtt_subscribe [--url <opc.mqtt://hostname:port>] "
"[--json]n"
" Defaults are:n"
" - Url: opc.mqtt://127.0.0.1:1883n"
" - JSON: Offn");
}
int main(int argc, char **argv) {
char *addressUrl = BROKER_ADDRESS_URL;
/* Parse arguments */
for(int argpos = 1; argpos < argc; argpos++) {
if(strcmp(argv[argpos], "--help") == 0) {
usage();
return 0;
}
if(strcmp(argv[argpos], "--json") == 0) {
useJson = true;
continue;
}
if(strcmp(argv[argpos], "--url") == 0) {
if(argpos + 1 == argc) {
usage();
return -1;
}
argpos++;
addressUrl = argv[argpos];
continue;
}
}
/* Return value initialized to Status Good */
UA_Server *server = UA_Server_new();
UA_ServerConfig *config = UA_Server_getConfig(server);
config->logging = UA_Log_Stdout;
/* API calls */
/* Add PubSubConnection */
UA_StatusCode retval = addPubSubConnection(server, addressUrl);
if (retval != UA_STATUSCODE_GOOD)
goto cleanup;
/* Add ReaderGroup to the created PubSubConnection */
retval = addReaderGroup(server);
if (retval != UA_STATUSCODE_GOOD)
goto cleanup;
/* Add DataSetReader to the created ReaderGroup */
retval |= addDataSetReader(server);
if (retval != UA_STATUSCODE_GOOD)
goto cleanup;
/* Add SubscribedVariables to the created DataSetReader */
retval |= addSubscribedVariables(server, readerIdent);
if (retval != UA_STATUSCODE_GOOD)
goto cleanup;
UA_ValueCallback cb;
cb.onRead = onRead;
cb.onWrite = onWrite;
UA_Server_setVariableNode_valueCallback(server, UA_NODEID_NUMERIC(1, 50000), cb);
retval = UA_Server_runUntilInterrupt(server);
cleanup:
UA_Server_delete(server);
return EXIT_SUCCESS;
}
I can clearly see that the messages are sent by monitoring the corresponding topic on the broker. However, the log on the subscriber side does not indicate that any messages are received and subsequently no nodes are updated. I hoped to see writing to the nodes by using callbacks, but this also shows nothing. The subscriber example provided by the lib itself lacks the call to UA_Server_enableDataSetReader
and hence does not reach the operational
state. However, this showed that the messages were received by the subscriber, but could not be processed. Therefore, it is not a MQTT problem.
Subscriber side log
[2024-08-14 11:58:09.767 (UTC+0200)] info/eventloop Starting the EventLoop
[2024-08-14 11:58:09.768 (UTC+0200)] warn/server AccessControl: Unconfigured AccessControl. Users have all permissions.
[2024-08-14 11:58:09.768 (UTC+0200)] info/server AccessControl: Anonymous login is enabled
[2024-08-14 11:58:09.768 (UTC+0200)] warn/server x509 Certificate Authentication configured, but no encrypting SecurityPolicy. This can leak credentials on the network.
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub PubSubConnection ns=1;i=52040 | Connection created
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub PubSubConnection ns=1;i=52040 | State change: Disabled -> PreOperational
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub PubSubConnection ns=1;i=52040 | ReaderGroup ns=1;i=50910 | ReaderGroup created
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub PubSubConnection ns=1;i=52040 | State change: PreOperational -> Operational
[2024-08-14 11:58:09.777 (UTC+0200)] info/network TCP 6 | Opening a connection to "127.0.0.1" on port 1883
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub PubSubConnection ns=1;i=52040 | ReaderGroup ns=1;i=50910 | State change: Disabled -> Operational
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub PubSubConnection ns=1;i=52040 | ReaderGroup ns=1;i=50910 | DataSetReader ns=1;i=50914 | DataSetReader created
[2024-08-14 11:58:09.777 (UTC+0200)] debug/pubsub PubSubConnection ns=1;i=52040 | ReaderGroup ns=1;i=50910 | DataSetReader ns=1;i=50914 | UA_PubSubComponent_createMonitoring(): Set MessageReceiveTimeout callback
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub PubSubConnection ns=1;i=52040 | ReaderGroup ns=1;i=50910 | DataSetReader ns=1;i=50914 | State change: Disabled -> Operational
[2024-08-14 11:58:09.777 (UTC+0200)] warn/server Maximum SecureChannels count not enough for the maximum Sessions count
[2024-08-14 11:58:09.777 (UTC+0200)] info/network TCP | Listening on all interfaces
[2024-08-14 11:58:09.778 (UTC+0200)] info/network TCP 8 | Creating listen socket for "0.0.0.0" (with local hostname "host") on port 4840
[2024-08-14 11:58:09.778 (UTC+0200)] info/server New DiscoveryUrl added: opc.tcp://host:4840
[2024-08-14 11:58:09.778 (UTC+0200)] info/network TCP 9 | Creating listen socket for "::" (with local hostname "host") on port 4840
[2024-08-14 11:58:09.778 (UTC+0200)] info/network MQTT 6001 | Created connection subscribed on topic "customTopic"
I would be grateful for any hints why this does not work. Thanks in advance!