open62541 MQTT subscriber receives but does not process messages

I am trying to get the open62541 MQTT publish/subscribe mechanism to work. I tried using the provided examples (tutorial_pubsub_mqtt_publish.c and tutorial_pubsub_mqtt_subscribe.c) as a baseline, but the subscriber example seems to have issues. At the moment, I am able to send messages, but the messages do not seem to update the subscriber-side nodes upon receiving them. I am running the two applications locally in combination with the mosquitto MQTT broker. The publisher side code is taken from the aforementioned example code file, while the subscriber code was slightly modified:

Subscriber side code

#include <open62541/common.h>
#include <open62541/plugin/log.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/plugin/nodestore.h>
#include <open62541/server.h>
#include <open62541/server_pubsub.h>
#include <open62541/plugin/securitypolicy_default.h>

#include <open62541/types.h>
#include <stdio.h>

#define CONNECTION_NAME               "MQTT Subscriber Connection"
#define TRANSPORT_PROFILE_URI_UADP    "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp"
#define TRANSPORT_PROFILE_URI_JSON    "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-json"
#define MQTT_CLIENT_ID                "TESTCLIENTPUBSUBMQTTSUBSCRIBE"
#define CONNECTIONOPTION_NAME         "mqttClientId"
#define SUBSCRIBER_TOPIC              "customTopic"
#define SUBSCRIBER_METADATAQUEUENAME  "MetaDataTopic"
#define SUBSCRIBER_METADATAUPDATETIME 0
#define BROKER_ADDRESS_URL            "opc.mqtt://127.0.0.1:1883"

static UA_Boolean useJson = false;

UA_NodeId connectionIdent;
UA_NodeId readerGroupIdent;
UA_NodeId readerIdent;

UA_DataSetReaderConfig readerConfig;

static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData);

static UA_StatusCode
addPubSubConnection(UA_Server *server, char *addressUrl) {
    UA_StatusCode retval = UA_STATUSCODE_GOOD;

    UA_PubSubConnectionConfig connectionConfig;
    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
    connectionConfig.name = UA_STRING(CONNECTION_NAME);
    if (useJson) {
        connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI_JSON);
    } else {
        connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI_UADP);
    }
    connectionConfig.enabled = UA_TRUE;

    /* configure address of the mqtt broker (local on default port) */
    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
    connectionConfig.publisherId.idType = UA_PUBLISHERIDTYPE_UINT16;
    connectionConfig.publisherId.id.uint16 = 2234;

    UA_KeyValuePair connectionOptions[1];

    UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
    connectionOptions[0].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
    UA_Variant_setScalar(&connectionOptions[0].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);
    connectionConfig.connectionProperties.map = connectionOptions;
    connectionConfig.connectionProperties.mapSize = 1;

    retval |= UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);

    return retval;
}

/* Add ReaderGroup to the created connection */
static UA_StatusCode
addReaderGroup(UA_Server *server) {
    if(server == NULL) {
        return UA_STATUSCODE_BADINTERNALERROR;
    }

    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    UA_ReaderGroupConfig readerGroupConfig;
    memset(&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig));
    readerGroupConfig.name = UA_STRING("ReaderGroup1");
    if(useJson)
        readerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;

    /* configure the mqtt publish topic */
    UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
    /* Assign the Topic at which MQTT publish should happen */
    brokerTransportSettings.queueName = UA_STRING(SUBSCRIBER_TOPIC);
    brokerTransportSettings.resourceUri = UA_STRING_NULL;
    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;

    /* Choose the QOS Level for MQTT */
    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;

    /* Encapsulate config in transportSettings */
    UA_ExtensionObject transportSettings;
    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETREADERTRANSPORTDATATYPE];
    transportSettings.content.decoded.data = &brokerTransportSettings;

    readerGroupConfig.transportSettings = transportSettings;

    retval |= UA_Server_addReaderGroup(server, connectionIdent, &readerGroupConfig,
                                       &readerGroupIdent);
    UA_Server_enableReaderGroup(server, readerGroupIdent);

    return retval;
}

/* Add DataSetReader to the ReaderGroup */
static UA_StatusCode
addDataSetReader(UA_Server *server) {
    if(server == NULL) {
        return UA_STATUSCODE_BADINTERNALERROR;
    }

    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    memset(&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
    readerConfig.name = UA_STRING("DataSet Reader 1");
    /* Parameters to filter which DataSetMessage has to be processed
     * by the DataSetReader */
    /* The following parameters are used to show that the data published by
     * tutorial_pubsub_mqtt_publish.c is being subscribed and is being updated in
     * the information model */
    UA_UInt16 publisherIdentifier = 2234;
    readerConfig.publisherId.idType = UA_PUBLISHERIDTYPE_UINT16;
    readerConfig.publisherId.id.uint16 = publisherIdentifier;
    readerConfig.writerGroupId    = 100;
    readerConfig.dataSetWriterId  = 62541;
#ifdef UA_ENABLE_PUBSUB_MONITORING
    readerConfig.messageReceiveTimeout = 10;
#endif
    /* Setting up Meta data configuration in DataSetReader */
    fillTestDataSetMetaData(&readerConfig.dataSetMetaData);
    retval |= UA_Server_addDataSetReader(server, readerGroupIdent, &readerConfig,
                                         &readerIdent);
    UA_Server_enableDataSetReader(server, readerIdent);
    return retval;
}

static UA_StatusCode
addSubscribedVariables (UA_Server *server, UA_NodeId dataSetReaderId) {
    if(server == NULL)
        return UA_STATUSCODE_BADINTERNALERROR;

    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    UA_NodeId folderId;
    UA_String folderName = readerConfig.dataSetMetaData.name;
    UA_ObjectAttributes oAttr = UA_ObjectAttributes_default;
    UA_QualifiedName folderBrowseName;
    if(folderName.length > 0) {
        oAttr.displayName.locale = UA_STRING ("en-US");
        oAttr.displayName.text = folderName;
        folderBrowseName.namespaceIndex = 1;
        folderBrowseName.name = folderName;
    }
    else {
        oAttr.displayName = UA_LOCALIZEDTEXT ("en-US", "Subscribed Variables");
        folderBrowseName = UA_QUALIFIEDNAME (1, "Subscribed Variables");
    }

    UA_Server_addObjectNode(server, UA_NODEID_NULL, UA_NS0ID(OBJECTSFOLDER),
                            UA_NS0ID(ORGANIZES), folderBrowseName,
                            UA_NS0ID(BASEOBJECTTYPE), oAttr, NULL, &folderId);

/**
 * **TargetVariables**
 *
 * The SubscribedDataSet option TargetVariables defines a list of Variable mappings between
 * received DataSet fields and target Variables in the Subscriber AddressSpace.
 * The values subscribed from the Publisher are updated in the value field of these variables */
    /* Create the TargetVariables with respect to DataSetMetaData fields */
    UA_FieldTargetVariable *targetVars = (UA_FieldTargetVariable *)
        UA_calloc(readerConfig.dataSetMetaData.fieldsSize, sizeof(UA_FieldTargetVariable));
    for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++) {
        /* Variable to subscribe data */
        UA_VariableAttributes vAttr = UA_VariableAttributes_default;
        UA_LocalizedText_copy(&readerConfig.dataSetMetaData.fields[i].description,
                              &vAttr.description);
        vAttr.displayName.locale = UA_STRING("en-US");
        vAttr.displayName.text = readerConfig.dataSetMetaData.fields[i].name;
        vAttr.dataType = readerConfig.dataSetMetaData.fields[i].dataType;

        UA_NodeId newNode;
        retval |= UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, (UA_UInt32)i + 50000),
                                            folderId, UA_NS0ID(HASCOMPONENT),
                                            UA_QUALIFIEDNAME(1, (char *)readerConfig.dataSetMetaData.fields[i].name.data),
                                            UA_NS0ID(BASEDATAVARIABLETYPE),
                                            vAttr, NULL, &newNode);

        /* For creating Targetvariables */
        UA_FieldTargetDataType_init(&targetVars[i].targetVariable);
        targetVars[i].targetVariable.attributeId  = UA_ATTRIBUTEID_VALUE;
        targetVars[i].targetVariable.targetNodeId = newNode;
    }

    retval = UA_Server_DataSetReader_createTargetVariables(server, dataSetReaderId,
                                                           readerConfig.dataSetMetaData.fieldsSize, targetVars);
    for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++)
        UA_FieldTargetDataType_clear(&targetVars[i].targetVariable);

    UA_free(targetVars);
    UA_free(readerConfig.dataSetMetaData.fields);
    return retval;
}

/**
 * **DataSetMetaData**
 *
 * The DataSetMetaData describes the content of a DataSet. It provides the information necessary to decode
 * DataSetMessages on the Subscriber side. DataSetMessages received from the Publisher are decoded into
 * DataSet and each field is updated in the Subscriber based on datatype match of TargetVariable fields of Subscriber
 * and PublishedDataSetFields of Publisher */
/* Define MetaData for TargetVariables */
static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData) {
    if(pMetaData == NULL) {
        return;
    }

    UA_DataSetMetaDataType_init (pMetaData);
    pMetaData->name = UA_STRING ("DataSet 1");

    /* Static definition of number of fields size to 4 to create four different
     * targetVariables of distinct datatype
     * Currently the publisher sends only DateTime data type */
    pMetaData->fieldsSize = 4;
    pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize,
                                                         &UA_TYPES[UA_TYPES_FIELDMETADATA]);

    /* DateTime DataType */
    UA_FieldMetaData_init (&pMetaData->fields[0]);
    UA_NodeId_copy (&UA_TYPES[UA_TYPES_DATETIME].typeId,
                    &pMetaData->fields[0].dataType);
    pMetaData->fields[0].builtInType = UA_NS0ID_DATETIME;
    pMetaData->fields[0].name =  UA_STRING ("DateTime");
    pMetaData->fields[0].valueRank = -1; /* scalar */

    /* Int32 DataType */
    UA_FieldMetaData_init (&pMetaData->fields[1]);
    UA_NodeId_copy(&UA_TYPES[UA_TYPES_INT32].typeId,
                   &pMetaData->fields[1].dataType);
    pMetaData->fields[1].builtInType = UA_NS0ID_INT32;
    pMetaData->fields[1].name =  UA_STRING ("Int32");
    pMetaData->fields[1].valueRank = -1; /* scalar */

    /* Int64 DataType */
    UA_FieldMetaData_init (&pMetaData->fields[2]);
    UA_NodeId_copy(&UA_TYPES[UA_TYPES_INT64].typeId,
                   &pMetaData->fields[2].dataType);
    pMetaData->fields[2].builtInType = UA_NS0ID_INT64;
    pMetaData->fields[2].name =  UA_STRING ("Int64");
    pMetaData->fields[2].valueRank = -1; /* scalar */

    /* Boolean DataType */
    UA_FieldMetaData_init (&pMetaData->fields[3]);
    UA_NodeId_copy (&UA_TYPES[UA_TYPES_BOOLEAN].typeId,
                    &pMetaData->fields[3].dataType);
    pMetaData->fields[3].builtInType = UA_NS0ID_BOOLEAN;
    pMetaData->fields[3].name =  UA_STRING ("BoolToggle");
    pMetaData->fields[3].valueRank = -1; /* scalar */
}

static void onRead(UA_Server *server,
                           const UA_NodeId *sessionId, void *sessionContext,
                           const UA_NodeId *nodeid, void *nodeContext,
                           const UA_NumericRange *range, const UA_DataValue *data)
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Variable read");
}


static void onWrite(UA_Server *server,
                          const UA_NodeId *sessionId, void *sessionContext,
                          const UA_NodeId *nodeId, void *nodeContext,
                          const UA_NumericRange *range, const UA_DataValue *data)
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Variable write");
}
static void usage(void) {
    printf("Usage: tutorial_pubsub_mqtt_subscribe [--url <opc.mqtt://hostname:port>] "
           "[--json]n"
           "  Defaults are:n"
           "  - Url: opc.mqtt://127.0.0.1:1883n"
           "  - JSON: Offn");
}

int main(int argc, char **argv) {
    char *addressUrl = BROKER_ADDRESS_URL;

    /* Parse arguments */
    for(int argpos = 1; argpos < argc; argpos++) {
        if(strcmp(argv[argpos], "--help") == 0) {
            usage();
            return 0;
        }

        if(strcmp(argv[argpos], "--json") == 0) {
            useJson = true;
            continue;
        }

        if(strcmp(argv[argpos], "--url") == 0) {
            if(argpos + 1 == argc) {
                usage();
                return -1;
            }
            argpos++;
            addressUrl = argv[argpos];
            continue;
        }
    }

    /* Return value initialized to Status Good */
    UA_Server *server = UA_Server_new();

    UA_ServerConfig *config = UA_Server_getConfig(server);
    config->logging = UA_Log_Stdout;

    /* API calls */
    /* Add PubSubConnection */
    UA_StatusCode retval = addPubSubConnection(server, addressUrl);
    if (retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    /* Add ReaderGroup to the created PubSubConnection */
    retval = addReaderGroup(server);
    if (retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    /* Add DataSetReader to the created ReaderGroup */
    retval |= addDataSetReader(server);
    if (retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    /* Add SubscribedVariables to the created DataSetReader */
    retval |= addSubscribedVariables(server, readerIdent);
    if (retval != UA_STATUSCODE_GOOD)
        goto cleanup;

    UA_ValueCallback cb;
    cb.onRead = onRead;
    cb.onWrite = onWrite;
    UA_Server_setVariableNode_valueCallback(server, UA_NODEID_NUMERIC(1, 50000), cb);

    retval = UA_Server_runUntilInterrupt(server);
cleanup:
    UA_Server_delete(server);
    return EXIT_SUCCESS;
}

I can clearly see that the messages are sent by monitoring the corresponding topic on the broker. However, the log on the subscriber side does not indicate that any messages are received and subsequently no nodes are updated. I hoped to see writing to the nodes by using callbacks, but this also shows nothing. The subscriber example provided by the lib itself lacks the call to UA_Server_enableDataSetReader and hence does not reach the operational state. However, this showed that the messages were received by the subscriber, but could not be processed. Therefore, it is not a MQTT problem.

Subscriber side log

[2024-08-14 11:58:09.767 (UTC+0200)] info/eventloop     Starting the EventLoop
[2024-08-14 11:58:09.768 (UTC+0200)] warn/server        AccessControl: Unconfigured AccessControl. Users have all permissions.
[2024-08-14 11:58:09.768 (UTC+0200)] info/server        AccessControl: Anonymous login is enabled
[2024-08-14 11:58:09.768 (UTC+0200)] warn/server        x509 Certificate Authentication configured, but no encrypting SecurityPolicy. This can leak credentials on the network.
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub        PubSubConnection ns=1;i=52040   | Connection created
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub        PubSubConnection ns=1;i=52040   | State change: Disabled -> PreOperational
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub        PubSubConnection ns=1;i=52040   | ReaderGroup ns=1;i=50910      | ReaderGroup created
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub        PubSubConnection ns=1;i=52040   | State change: PreOperational -> Operational
[2024-08-14 11:58:09.777 (UTC+0200)] info/network       TCP 6   | Opening a connection to "127.0.0.1" on port 1883
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub        PubSubConnection ns=1;i=52040   | ReaderGroup ns=1;i=50910      | State change: Disabled -> Operational
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub        PubSubConnection ns=1;i=52040   | ReaderGroup ns=1;i=50910      | DataSetReader ns=1;i=50914 | DataSetReader created
[2024-08-14 11:58:09.777 (UTC+0200)] debug/pubsub       PubSubConnection ns=1;i=52040   | ReaderGroup ns=1;i=50910      | DataSetReader ns=1;i=50914 | UA_PubSubComponent_createMonitoring(): Set MessageReceiveTimeout callback
[2024-08-14 11:58:09.777 (UTC+0200)] info/pubsub        PubSubConnection ns=1;i=52040   | ReaderGroup ns=1;i=50910      | DataSetReader ns=1;i=50914 | State change: Disabled -> Operational
[2024-08-14 11:58:09.777 (UTC+0200)] warn/server        Maximum SecureChannels count not enough for the maximum Sessions count
[2024-08-14 11:58:09.777 (UTC+0200)] info/network       TCP     | Listening on all interfaces
[2024-08-14 11:58:09.778 (UTC+0200)] info/network       TCP 8   | Creating listen socket for "0.0.0.0" (with local hostname "host") on port 4840
[2024-08-14 11:58:09.778 (UTC+0200)] info/server        New DiscoveryUrl added: opc.tcp://host:4840
[2024-08-14 11:58:09.778 (UTC+0200)] info/network       TCP 9   | Creating listen socket for "::" (with local hostname "host") on port 4840
[2024-08-14 11:58:09.778 (UTC+0200)] info/network       MQTT 6001       | Created connection subscribed on topic "customTopic"

I would be grateful for any hints why this does not work. Thanks in advance!

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật