Message Queue causing core dump when checking is message available

When I’m trying to check if new message is available in message queue, I’m getting core dump error.
I’m calling isDataAvailableInReceiveDataQueue method to check if data is available in message queue.
I’m using mq_getattr function to read number available of messages queue and checking the mq_curmsgs variable in struct mq_attr structure.
If attr.mq_curmsgs>0; I treat new data is available.
I’m getting core dump error on return true.

OS: vxWorks 7
C++17

Code :

ipc.hpp

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>#pragma once
#include <queue>
#include <mutex>
#include <string>
#include <optional>
#include <iostream>
#include <functional> //For callback
//#include "common.hpp"
#include <signal.h> //Definition of SIGEV_* constants
#include <fcntl.h> //For O_RDWR, O_RDONLY, O_WRONLY constants
#include <mqueue.h> //For mode constants
#include <sys/stat.h> //For POSIX Message Queue
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
class Ipc
{
private:
using callback_t = std::function<void(void)>;
callback_t mCallback;
//Message Queue file descriptor
mqd_t mPrimaryToSecondaryMessageQueue {-1};
mqd_t mSecondaryToPrimaryMessageQueue {-1};
std::string mPrimaryToSecondaryMessageQueueName;
std::string mSecondaryToPrimaryMessageQueueName;
bool listen(void);
static void handler(union sigval sv);
bool open(bool noWaitDelay);
public:
Ipc() {}
bool open(const std::string& queueName, bool noWaitDelay = true);
bool close(void);
bool send(const T1& data, std::uint32_t priority = 10);
std::optional<T2> receive(void);
bool register_callback(callback_t callback_implementation);
bool isDataAvailableInReceiveDataQueue(void) const;
std::optional<std::uint32_t> getMessagesCountCurrentlyInQueue(void) const;
std::optional<std::uint32_t> getMessagesCountCurrentlyInLocalBuffer(void) const;
};
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::listen(void)
{
int returnStatus;
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = Ipc::handler;
sev.sigev_notify_attributes = nullptr;
sev.sigev_value.sival_ptr = this; // Arg. to thread function
if(primaryNode == true)
{
returnStatus = mq_notify(this->mSecondaryToPrimaryMessageQueue, &sev); //Listen on secondary to primary queue
}
else
{
returnStatus = mq_notify(this->mPrimaryToSecondaryMessageQueue, &sev); //Listen on primary to secondary queue
}
if(returnStatus == -1)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
return true;
}
template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline void Ipc<T1, T2, primaryNode, max_message_depth>::handler(sigval si)
{
try
{
mqd_t mqFd;
struct mq_attr attr;
Ipc *ipc = reinterpret_cast<Ipc *>(si.sival_ptr);
ipc->listen();
ipc->mCallback();
}
catch(const std::exception& e)
{
//Reinitialize Module
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << e.what() << std::endl;
// return;
}
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(const std::string& queueName, bool noWaitDelay)
{
try
{
if(primaryNode == true)
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
else
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
return this->open(noWaitDelay);
}
catch(const std::exception& e)
{
//Reinitialize module
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : " << errno << " Error : " << errorToString(errno) << std::endl;
return false;
}
}
template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(bool noWaitDelay)
{
try
{
//int returnStatus = -1;
int primaryToSecondaryFlags = 0;
int secondaryToPrimaryFlags = 0;
struct mq_attr primaryToSecondaryAttr = {0};
struct mq_attr secondaryToPrimaryAttr = {0};
if(primaryNode == true) //Data sent from primary to secondary
{
primaryToSecondaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Primary node will only send data to secondary node - Non-Blocking
secondaryToPrimaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Secondary node will only receive data from primary node - Non-Blocking
if(noWaitDelay == true)
{
primaryToSecondaryFlags |= O_NONBLOCK;
secondaryToPrimaryFlags |= O_NONBLOCK;
}
primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open()
primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
primaryToSecondaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent
primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open()
secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open()
secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
secondaryToPrimaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received
secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open()
}
else //Data sent from Secondary to Primary node
{
primaryToSecondaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Primary node will only receive data from secondary node
secondaryToPrimaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Secondary node will only send data to primary node
if(noWaitDelay == true)
{
primaryToSecondaryFlags |= O_NONBLOCK;
secondaryToPrimaryFlags |= O_NONBLOCK;
}
primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open()
primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
primaryToSecondaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received
primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open()
secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open()
secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
secondaryToPrimaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent
secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open()
}
//Initialize Primary to Secondary Node
this->mPrimaryToSecondaryMessageQueue = mq_open(
this->mPrimaryToSecondaryMessageQueueName.c_str(),
primaryToSecondaryFlags,
S_IRWXU | S_IRWXG | S_IRWXO, // user (file owner), group and others have read, write, and execute permission
&primaryToSecondaryAttr
);
if(this->mPrimaryToSecondaryMessageQueue == static_cast<mqd_t>(-1))
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : " << errno << " Error : " << errorToString(errno) << std::endl;
return false;
}
//Initialize Secondary to Primary Node
this->mSecondaryToPrimaryMessageQueue = mq_open(
this->mSecondaryToPrimaryMessageQueueName.c_str(),
secondaryToPrimaryFlags,
S_IRWXU | S_IRWXG | S_IRWXO,// user (file owner), group and others have read, write, and execute permission
&secondaryToPrimaryAttr
);
if(this->mSecondaryToPrimaryMessageQueue == static_cast<mqd_t>(-1))
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
return this->listen();
}
catch(const std::exception& e)
{
std::cerr << e.what() << 'n';
return false;
}
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::close(void)
{
try
{
int returnStatus = mq_close(this->mPrimaryToSecondaryMessageQueue);
if(returnStatus == -1)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
returnStatus = mq_close(this->mSecondaryToPrimaryMessageQueue);
if(returnStatus == -1)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
return true;
}
catch (const std::exception& e)
{
std::cout << e.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::send(const T1& data, std::uint32_t priority)
{
mqd_t mqFd;
if (primaryNode == true) //Send message on Primary to Secondary Queue
{
mqFd = this->mPrimaryToSecondaryMessageQueue;
}
else //Send message on Secondary to Primary Queue
{
mqFd = this->mSecondaryToPrimaryMessageQueue;
}
int returnStatus = mq_send(
mqFd,
reinterpret_cast<const char *>(&data),
sizeof(data),
priority
);
if (returnStatus == -1)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
//std::cout<<"nSent Message without errorn";
return true;
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline std::optional<T2> Ipc<T1, T2, primaryNode, max_message_depth>::receive(void)
{
mqd_t mqFd;
T2 ipc_receive;
if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
{
mqFd = this->mSecondaryToPrimaryMessageQueue;
}
else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
{
mqFd = this->mPrimaryToSecondaryMessageQueue;
}
ssize_t receiveLength = mq_receive(
mqFd,
reinterpret_cast<char *>(&ipc_receive),
sizeof(ipc_receive),
nullptr
);
if(receiveLength == -1)
{
std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return std::nullopt;
}
return ipc_receive;
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::register_callback(callback_t callbackImplementation)
{
try
{
this->mCallback = callbackImplementation;
return true;
}
catch (const std::exception& e)
{
std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
}
return false;
}
template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::isDataAvailableInReceiveDataQueue(void) const
{
mqd_t mqFd;
struct mq_attr attr;
if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
{
mqFd = this->mSecondaryToPrimaryMessageQueue;
}
else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
{
mqFd = this->mPrimaryToSecondaryMessageQueue;
}
if (mq_getattr(mqFd, &attr) == -1)
{
std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
return false;
}
if (attr.mq_curmsgs > 0) //If 1 or more message are available
{
return true; //Core dump after executing this statement
}
else
{
return false;
}
}
template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline std::optional<std::uint32_t> Ipc<T1, T2, primaryNode, max_message_depth>::getMessagesCountCurrentlyInQueue(void) const
{
struct mq_attr attribute;
mqd_t mqFd;
if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
{
mqFd = this->mSecondaryToPrimaryMessageQueue;
}
else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
{
mqFd = this->mPrimaryToSecondaryMessageQueue;
}
if (mq_getattr(mqFd, &attribute) == -1)
{
perror("mq_getattr");
//Log Error
return std::nullopt;
}
return attribute.mq_curmsgs;
}
</code>
<code>#pragma once #include <queue> #include <mutex> #include <string> #include <optional> #include <iostream> #include <functional> //For callback //#include "common.hpp" #include <signal.h> //Definition of SIGEV_* constants #include <fcntl.h> //For O_RDWR, O_RDONLY, O_WRONLY constants #include <mqueue.h> //For mode constants #include <sys/stat.h> //For POSIX Message Queue template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> class Ipc { private: using callback_t = std::function<void(void)>; callback_t mCallback; //Message Queue file descriptor mqd_t mPrimaryToSecondaryMessageQueue {-1}; mqd_t mSecondaryToPrimaryMessageQueue {-1}; std::string mPrimaryToSecondaryMessageQueueName; std::string mSecondaryToPrimaryMessageQueueName; bool listen(void); static void handler(union sigval sv); bool open(bool noWaitDelay); public: Ipc() {} bool open(const std::string& queueName, bool noWaitDelay = true); bool close(void); bool send(const T1& data, std::uint32_t priority = 10); std::optional<T2> receive(void); bool register_callback(callback_t callback_implementation); bool isDataAvailableInReceiveDataQueue(void) const; std::optional<std::uint32_t> getMessagesCountCurrentlyInQueue(void) const; std::optional<std::uint32_t> getMessagesCountCurrentlyInLocalBuffer(void) const; }; template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline bool Ipc<T1, T2, primaryNode, max_message_depth>::listen(void) { int returnStatus; struct sigevent sev; sev.sigev_notify = SIGEV_THREAD; sev.sigev_notify_function = Ipc::handler; sev.sigev_notify_attributes = nullptr; sev.sigev_value.sival_ptr = this; // Arg. to thread function if(primaryNode == true) { returnStatus = mq_notify(this->mSecondaryToPrimaryMessageQueue, &sev); //Listen on secondary to primary queue } else { returnStatus = mq_notify(this->mPrimaryToSecondaryMessageQueue, &sev); //Listen on primary to secondary queue } if(returnStatus == -1) { std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl; return false; } return true; } template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline void Ipc<T1, T2, primaryNode, max_message_depth>::handler(sigval si) { try { mqd_t mqFd; struct mq_attr attr; Ipc *ipc = reinterpret_cast<Ipc *>(si.sival_ptr); ipc->listen(); ipc->mCallback(); } catch(const std::exception& e) { //Reinitialize Module std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << e.what() << std::endl; // return; } } template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(const std::string& queueName, bool noWaitDelay) { try { if(primaryNode == true) { this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out"); this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in"); } else { this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out"); this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in"); } return this->open(noWaitDelay); } catch(const std::exception& e) { //Reinitialize module std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : " << errno << " Error : " << errorToString(errno) << std::endl; return false; } } template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(bool noWaitDelay) { try { //int returnStatus = -1; int primaryToSecondaryFlags = 0; int secondaryToPrimaryFlags = 0; struct mq_attr primaryToSecondaryAttr = {0}; struct mq_attr secondaryToPrimaryAttr = {0}; if(primaryNode == true) //Data sent from primary to secondary { primaryToSecondaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Primary node will only send data to secondary node - Non-Blocking secondaryToPrimaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Secondary node will only receive data from primary node - Non-Blocking if(noWaitDelay == true) { primaryToSecondaryFlags |= O_NONBLOCK; secondaryToPrimaryFlags |= O_NONBLOCK; } primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open() primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue primaryToSecondaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open() secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open() secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue secondaryToPrimaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open() } else //Data sent from Secondary to Primary node { primaryToSecondaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Primary node will only receive data from secondary node secondaryToPrimaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Secondary node will only send data to primary node if(noWaitDelay == true) { primaryToSecondaryFlags |= O_NONBLOCK; secondaryToPrimaryFlags |= O_NONBLOCK; } primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open() primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue primaryToSecondaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open() secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open() secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue secondaryToPrimaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open() } //Initialize Primary to Secondary Node this->mPrimaryToSecondaryMessageQueue = mq_open( this->mPrimaryToSecondaryMessageQueueName.c_str(), primaryToSecondaryFlags, S_IRWXU | S_IRWXG | S_IRWXO, // user (file owner), group and others have read, write, and execute permission &primaryToSecondaryAttr ); if(this->mPrimaryToSecondaryMessageQueue == static_cast<mqd_t>(-1)) { std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : " << errno << " Error : " << errorToString(errno) << std::endl; return false; } //Initialize Secondary to Primary Node this->mSecondaryToPrimaryMessageQueue = mq_open( this->mSecondaryToPrimaryMessageQueueName.c_str(), secondaryToPrimaryFlags, S_IRWXU | S_IRWXG | S_IRWXO,// user (file owner), group and others have read, write, and execute permission &secondaryToPrimaryAttr ); if(this->mSecondaryToPrimaryMessageQueue == static_cast<mqd_t>(-1)) { std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl; return false; } return this->listen(); } catch(const std::exception& e) { std::cerr << e.what() << 'n'; return false; } } template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline bool Ipc<T1, T2, primaryNode, max_message_depth>::close(void) { try { int returnStatus = mq_close(this->mPrimaryToSecondaryMessageQueue); if(returnStatus == -1) { std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl; return false; } returnStatus = mq_close(this->mSecondaryToPrimaryMessageQueue); if(returnStatus == -1) { std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl; return false; } return true; } catch (const std::exception& e) { std::cout << e.what() << std::endl; return false; } } template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline bool Ipc<T1, T2, primaryNode, max_message_depth>::send(const T1& data, std::uint32_t priority) { mqd_t mqFd; if (primaryNode == true) //Send message on Primary to Secondary Queue { mqFd = this->mPrimaryToSecondaryMessageQueue; } else //Send message on Secondary to Primary Queue { mqFd = this->mSecondaryToPrimaryMessageQueue; } int returnStatus = mq_send( mqFd, reinterpret_cast<const char *>(&data), sizeof(data), priority ); if (returnStatus == -1) { std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl; return false; } //std::cout<<"nSent Message without errorn"; return true; } template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline std::optional<T2> Ipc<T1, T2, primaryNode, max_message_depth>::receive(void) { mqd_t mqFd; T2 ipc_receive; if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode { mqFd = this->mSecondaryToPrimaryMessageQueue; } else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode) { mqFd = this->mPrimaryToSecondaryMessageQueue; } ssize_t receiveLength = mq_receive( mqFd, reinterpret_cast<char *>(&ipc_receive), sizeof(ipc_receive), nullptr ); if(receiveLength == -1) { std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl; return std::nullopt; } return ipc_receive; } template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline bool Ipc<T1, T2, primaryNode, max_message_depth>::register_callback(callback_t callbackImplementation) { try { this->mCallback = callbackImplementation; return true; } catch (const std::exception& e) { std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl; } return false; } template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline bool Ipc<T1, T2, primaryNode, max_message_depth>::isDataAvailableInReceiveDataQueue(void) const { mqd_t mqFd; struct mq_attr attr; if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode { mqFd = this->mSecondaryToPrimaryMessageQueue; } else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode) { mqFd = this->mPrimaryToSecondaryMessageQueue; } if (mq_getattr(mqFd, &attr) == -1) { std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl; return false; } if (attr.mq_curmsgs > 0) //If 1 or more message are available { return true; //Core dump after executing this statement } else { return false; } } template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth> inline std::optional<std::uint32_t> Ipc<T1, T2, primaryNode, max_message_depth>::getMessagesCountCurrentlyInQueue(void) const { struct mq_attr attribute; mqd_t mqFd; if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode { mqFd = this->mSecondaryToPrimaryMessageQueue; } else //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode) { mqFd = this->mPrimaryToSecondaryMessageQueue; } if (mq_getattr(mqFd, &attribute) == -1) { perror("mq_getattr"); //Log Error return std::nullopt; } return attribute.mq_curmsgs; } </code>
#pragma once

#include <queue>
#include <mutex>
#include <string>
#include <optional>
#include <iostream>
#include <functional> //For callback
//#include "common.hpp"
#include <signal.h> //Definition of SIGEV_* constants
#include <fcntl.h> //For O_RDWR, O_RDONLY, O_WRONLY constants
#include <mqueue.h> //For mode constants
#include <sys/stat.h> //For POSIX Message Queue

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
class Ipc
{
private:
    using callback_t = std::function<void(void)>;
    callback_t mCallback;
    
    //Message Queue file descriptor
    mqd_t                               mPrimaryToSecondaryMessageQueue {-1};
    mqd_t                               mSecondaryToPrimaryMessageQueue {-1};
    std::string                         mPrimaryToSecondaryMessageQueueName;
    std::string                         mSecondaryToPrimaryMessageQueueName;
    bool                                listen(void);
    static void                         handler(union sigval sv);
    bool                                open(bool noWaitDelay);
public:
    Ipc() {}
    bool                                open(const std::string& queueName, bool noWaitDelay = true);
    bool                                close(void);
    bool                                send(const T1& data, std::uint32_t priority = 10);
    std::optional<T2>                   receive(void);
    bool                                register_callback(callback_t callback_implementation);
    bool                                isDataAvailableInReceiveDataQueue(void) const;
    std::optional<std::uint32_t>        getMessagesCountCurrentlyInQueue(void) const;
    std::optional<std::uint32_t>        getMessagesCountCurrentlyInLocalBuffer(void) const;
};


template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::listen(void)
{
    int returnStatus;
    struct sigevent sev;
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = Ipc::handler;
    sev.sigev_notify_attributes = nullptr;
    sev.sigev_value.sival_ptr = this;   // Arg. to thread function
    if(primaryNode == true)
    {
        returnStatus = mq_notify(this->mSecondaryToPrimaryMessageQueue, &sev); //Listen on secondary to primary queue
    }
    else
    {
        returnStatus = mq_notify(this->mPrimaryToSecondaryMessageQueue, &sev); //Listen on primary to secondary queue
    }
    if(returnStatus == -1)
    {
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
        return false;
    }
    return true;
}

template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline void Ipc<T1, T2, primaryNode, max_message_depth>::handler(sigval si)
{
    try
    {
        mqd_t           mqFd;
        struct mq_attr  attr;
        
        Ipc *ipc = reinterpret_cast<Ipc *>(si.sival_ptr);

        ipc->listen();
        
        ipc->mCallback();
    }
    catch(const std::exception& e)
    {
        //Reinitialize Module
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << e.what() << std::endl;
        // return;
    }
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(const std::string& queueName, bool noWaitDelay)
{
    try
    {
        if(primaryNode == true)
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        else
        {
            this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
            this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
        }
        return this->open(noWaitDelay);
    }
    catch(const std::exception& e)
    {
        //Reinitialize module
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : "  << errno << " Error : " << errorToString(errno) << std::endl;
        return false;
    }
}

template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::open(bool noWaitDelay)
{
    try
    {
        //int returnStatus = -1;
        int primaryToSecondaryFlags = 0;
        int secondaryToPrimaryFlags = 0;
        struct mq_attr primaryToSecondaryAttr = {0};
        struct mq_attr secondaryToPrimaryAttr = {0};
        if(primaryNode == true) //Data sent from primary to secondary
        {
            primaryToSecondaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Primary node will only send data to secondary node - Non-Blocking
            secondaryToPrimaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Secondary node will only receive data from primary node - Non-Blocking

            if(noWaitDelay == true)
            {
                primaryToSecondaryFlags |= O_NONBLOCK;
                secondaryToPrimaryFlags |= O_NONBLOCK;
            }

            primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open()
            primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
            primaryToSecondaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent
            primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open()
            
            secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open()
            secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
            secondaryToPrimaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received
            secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open()
        }
        else //Data sent from Secondary to Primary node
        {
            primaryToSecondaryFlags = O_CREAT | O_RDONLY/* | O_NONBLOCK*/; //Primary node will only receive data from secondary node
            secondaryToPrimaryFlags = O_CREAT | O_WRONLY/* | O_NONBLOCK*/; //Secondary node will only send data to primary node
            if(noWaitDelay == true)
            {
                primaryToSecondaryFlags |= O_NONBLOCK;
                secondaryToPrimaryFlags |= O_NONBLOCK;
            }

            primaryToSecondaryAttr.mq_flags = 0; //ignored for mq_open()
            primaryToSecondaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
            primaryToSecondaryAttr.mq_msgsize = sizeof(T2); //maximum message size in bytes to be received
            primaryToSecondaryAttr.mq_curmsgs = 0; //ignored for mq_open()
            
            secondaryToPrimaryAttr.mq_flags = 0; //ignored for mq_open()
            secondaryToPrimaryAttr.mq_maxmsg = max_message_depth; //maximum number of messages possible in queue
            secondaryToPrimaryAttr.mq_msgsize = sizeof(T1); //maximum message size in bytes to be sent
            secondaryToPrimaryAttr.mq_curmsgs = 0; //ignored for mq_open()
        }
        
        //Initialize Primary to Secondary Node
        this->mPrimaryToSecondaryMessageQueue = mq_open(
            this->mPrimaryToSecondaryMessageQueueName.c_str(), 
            primaryToSecondaryFlags,
            S_IRWXU | S_IRWXG | S_IRWXO, // user (file owner), group and others have read, write, and execute permission
            &primaryToSecondaryAttr
        );
        if(this->mPrimaryToSecondaryMessageQueue == static_cast<mqd_t>(-1))
        {
            std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error Number : "  << errno << " Error : " << errorToString(errno) << std::endl;
            return false;
        }

        //Initialize Secondary to Primary Node
        this->mSecondaryToPrimaryMessageQueue = mq_open(
            this->mSecondaryToPrimaryMessageQueueName.c_str(), 
            secondaryToPrimaryFlags,
            S_IRWXU | S_IRWXG | S_IRWXO,// user (file owner), group and others have read, write, and execute permission
            &secondaryToPrimaryAttr
        );
        if(this->mSecondaryToPrimaryMessageQueue == static_cast<mqd_t>(-1))
        {
            std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
            return false;
        }

        return this->listen();
    }
    catch(const std::exception& e)
    {
        std::cerr << e.what() << 'n';
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::close(void)
{
    try
    {
        int returnStatus = mq_close(this->mPrimaryToSecondaryMessageQueue);
        if(returnStatus == -1)
        {
            std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
            return false;
        }

        returnStatus = mq_close(this->mSecondaryToPrimaryMessageQueue);
        if(returnStatus == -1)
        {
            std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
            return false;
        }
        return true;
    }
    catch (const std::exception& e)
    {
        std::cout << e.what() << std::endl;
        return false;
    }
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::send(const T1& data, std::uint32_t priority)
{
    mqd_t mqFd;
    if (primaryNode == true) //Send message on Primary to Secondary Queue
    {
        mqFd = this->mPrimaryToSecondaryMessageQueue;
    }
    else //Send message on Secondary to Primary Queue
    {
        mqFd = this->mSecondaryToPrimaryMessageQueue;
    }
    int returnStatus = mq_send(
            mqFd,
            reinterpret_cast<const char *>(&data),
            sizeof(data),
            priority
        );
    if (returnStatus == -1)
    {
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
        return false;
    }

    //std::cout<<"nSent Message without errorn";
    return true;
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline std::optional<T2> Ipc<T1, T2, primaryNode, max_message_depth>::receive(void)
{
    mqd_t   mqFd;
    T2      ipc_receive;
    if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
    {
        mqFd = this->mSecondaryToPrimaryMessageQueue;
    }
    else  //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
    {
        mqFd = this->mPrimaryToSecondaryMessageQueue;
    }
    ssize_t receiveLength = mq_receive(
        mqFd,
        reinterpret_cast<char *>(&ipc_receive), 
        sizeof(ipc_receive),
        nullptr
    );
    if(receiveLength == -1)
    {
        std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
        return std::nullopt;
    }
    return ipc_receive;
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::register_callback(callback_t callbackImplementation)
{
    try
    {
        this->mCallback = callbackImplementation;
        return true;
    }
    catch (const std::exception& e)
    {
        std::cout << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
    }
    return false;
}

template<typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline bool Ipc<T1, T2, primaryNode, max_message_depth>::isDataAvailableInReceiveDataQueue(void) const
{
    mqd_t           mqFd;
    struct mq_attr  attr;
    if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
    {
        mqFd = this->mSecondaryToPrimaryMessageQueue;
    }
    else  //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
    {
        mqFd = this->mPrimaryToSecondaryMessageQueue;
    }
    if (mq_getattr(mqFd, &attr) == -1)
    {
        std::cerr << __LINE__ << " Primary Node : " << primaryNode << " Error : " << errorToString(errno) << std::endl;
        return false;
    }

    if (attr.mq_curmsgs > 0) //If 1 or more message are available
    {
        return true; //Core dump after executing this statement
    }
    else
    {
        return false;
    }
}

template <typename T1, typename T2, bool primaryNode, std::uint16_t max_message_depth>
inline std::optional<std::uint32_t> Ipc<T1, T2, primaryNode, max_message_depth>::getMessagesCountCurrentlyInQueue(void) const
{
    struct mq_attr attribute;
    mqd_t           mqFd;
    if(primaryNode == true) //Select secondaryToPrimaryQueue queue to receieve data in primary mode
    {
        mqFd = this->mSecondaryToPrimaryMessageQueue;
    }
    else  //Select primaryToSecondary queue to receive data in non-primary mode (secondary mode)
    {
        mqFd = this->mPrimaryToSecondaryMessageQueue;
    }
    if (mq_getattr(mqFd, &attribute) == -1)
    {
        perror("mq_getattr");
        //Log Error
        return std::nullopt;
    }
    return attribute.mq_curmsgs;
}

main.cpp

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>#include <thread>
#include <iostream>
#include "ipc.hpp"
typedef struct ipc_data
{
int a;
}ipc_data_t;
Ipc<ipc_data_t,ipc_data_t,true,100>mq;
void thread_runner(void)
{
while (true)
{
if (mq.isDataAvailableInReceiveDataQueue() == true) //Core Dump on executing this step
{
std::optional<ipc_data_t> ipc_optional_data = mq.receive();
//Process further data
}
}
}
void callback(void)
{
//Singal Semaphore or Event
}
int main(int argc, char const *argv[])
{
mq.register_callback(callback);
bool openMessageQueueStatus = mq.open("mq1");
if (openMessageQueueStatus == true)
{
returnStatusInitialize = false;
}
else
{
//Log Error
returnStatusInitialize = false;
}
std::thread t {thread_runner};
while (true)
{
//Application Workflow
}
return 0;
}
</code>
<code>#include <thread> #include <iostream> #include "ipc.hpp" typedef struct ipc_data { int a; }ipc_data_t; Ipc<ipc_data_t,ipc_data_t,true,100>mq; void thread_runner(void) { while (true) { if (mq.isDataAvailableInReceiveDataQueue() == true) //Core Dump on executing this step { std::optional<ipc_data_t> ipc_optional_data = mq.receive(); //Process further data } } } void callback(void) { //Singal Semaphore or Event } int main(int argc, char const *argv[]) { mq.register_callback(callback); bool openMessageQueueStatus = mq.open("mq1"); if (openMessageQueueStatus == true) { returnStatusInitialize = false; } else { //Log Error returnStatusInitialize = false; } std::thread t {thread_runner}; while (true) { //Application Workflow } return 0; } </code>
#include <thread>
#include <iostream>
#include "ipc.hpp"

typedef struct ipc_data
{
    int a;
}ipc_data_t;

Ipc<ipc_data_t,ipc_data_t,true,100>mq;

void thread_runner(void)
{
    while (true)
    {
        if (mq.isDataAvailableInReceiveDataQueue() == true) //Core Dump on executing this step
        {
            std::optional<ipc_data_t> ipc_optional_data = mq.receive();
            //Process further data
        }
    }
}

void callback(void)
{
    //Singal Semaphore or Event
}

int main(int argc, char const *argv[])
{
    mq.register_callback(callback);
    bool openMessageQueueStatus = mq.open("mq1");
    if (openMessageQueueStatus == true)
    {
        returnStatusInitialize = false;
    }
    else
    {
        //Log Error
        returnStatusInitialize = false;
    }   
    std::thread t {thread_runner};
    while (true)
    {
       //Application Workflow
    }
    
    return 0;
}

4

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật