i am starting to learn using Posix Message Queues in C. (my host is Kubuntu 22.04)
I am Trying to write a program, that starts a provided number of threads that are waiting for messages (Message = struct that contains a Pointer to an allocated Memory) in the Queue. If there is a message, the fastest Thread takes the Message and process it somehow.
The main Program just starts the Threads, fills up the message queue and then waits till every thread finished, to display the results . But i always got the “Message too long”-error.
On my research i found out that this should be in general possible. But as i didn’t get a working program for days, i started breaking it down and i got the following code that is still not working because of the “Message too long”-error.
As i understand:
- for the message Queue there can be a maximum message size set (i chose the size of my struct: sizeof(message_t) ).
- For sending i am just using my own struct and the sizeof my struct.
- when reading a message i must tell the mq_receive the size of the message to read (which i also set to sizeof(message_t)
sending always works, and when i just sent one message to the queue, the content if the file (/dev/mqueue/test-123) showed me the correct QSIZE: 70 (20 + 50)
Am i missing something on here? Or am i just wrong that it is possible to put a struct into the message queue.
Additionally: As the only Communication through the Message Queue happens between parent process and threads, in my mind it should be safe to share addresses between them.
The reduced code looks like this:
#include <pthread.h>
#include <mqueue.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
typedef struct message {
//long int message_type;
char path[50];
char name[20];
} message_t;
int main(int argc, char *argv[])
{
const char* queue_name = "/test-123";
// Message Queue File Descriptor for Sending
mqd_t msgq_fd ;
// Attributes of the Message queue
struct mq_attr attr = { 0 };
attr.mq_maxmsg = 100;
attr.mq_msgsize = sizeof(message_t);
// (Create and) open the message Queue for Writing only
if((msgq_fd = mq_open(queue_name, O_RDWR | O_CREAT, 0660, &attr)) == -1) {
perror("Sender: mq_open failed");
exit(4);
}
//Put 5 Messages into the Message queue
for(int i=0; i < 5; i++)
{
message_t *msg = calloc(1,sizeof(message_t));
strcpy(msg->name,"test");
strcpy(msg->path,"testpath");
printf("Message %d prepared...", i);
// Error Sending
if(mq_send(msgq_fd, (const char *)msg, sizeof(message_t), 0) == -1) {
perror("Sender: unable to send message");
mq_close(msgq_fd);
exit(10);
}
printf("queuedn");
sleep(1);
}
printf("nReading messages from Queuen");
int ret = 0;
// Struct for storing the message from the Message Queue
message_t recv_msg;
// Read 5 messages from the Message Queue
for(int i=0; i < 5; i++)
{
printf("Message %d:n",i+1);
sleep(1);
ret = mq_receive(msgq_fd,(char*)&recv_msg,sizeof(attr.mq_msgsize),NULL);
// Error receiving
if(ret == -1) {
//printf("%d, %p, %ldn",msgq_fd2, &recv_msg, sizeof(message_t));
perror("Message receive error");
}
// Print the content of the received message
printf("%s/%snn",recv_msg.path,recv_msg.name);
}
return 0;
}
Basti is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.