I have a producer consumer problem with PThreads.
The program has an array of threads. Each producer goes through the array and tries to lock a mutex. if the mutex is lockable, the corresponding thread is free. The problem is, without the usleep(1)
producers use threads that are not free. with the usleep(1)
it works as expected. I think there is a race condition and signaling a condition is not instant. But I am not sure and I don’t know what the best way to fix this is.
I have simplified the program and removed eveything that is not needed for the bug. If comments are needed to understand whats happening, please ask.
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define CONSUMER_THREAD_DOUNT 3
#define PRODUCER_THREAD_DOUNT 8
struct s_count_args {
int *g;
pthread_mutex_t mutex;
pthread_cond_t condition;
int id;
int finsihed;
};
struct s_count_args count_args[CONSUMER_THREAD_DOUNT];
pthread_mutex_t out_mutex;
pthread_mutex_t max_mutex;
int thread_counter = 0;
void gen_output() {
printf("producer: Thread start datan");
pthread_mutex_lock(&out_mutex);
int trylock_result;
do {
thread_counter++;
if (thread_counter >= CONSUMER_THREAD_DOUNT)
thread_counter = 0;
trylock_result = pthread_mutex_trylock(&count_args[thread_counter].mutex);
if (trylock_result != 0 && trylock_result != EBUSY) {
// This shouldnt happend:
fprintf(stderr, "Error: pthread_mutex_trylock failed with code %dn",
trylock_result);
pthread_mutex_unlock(&out_mutex);
exit(EXIT_FAILURE);
}
} while (trylock_result != 0);
pthread_mutex_unlock(&count_args[thread_counter].mutex);
printf("producer: Give Signal to consumer with id: %dn", thread_counter);
pthread_cond_signal(&count_args[thread_counter].condition);
pthread_mutex_unlock(&out_mutex);
return;
}
pthread_mutex_t mutex;
struct gen_args {
pthread_mutex_t mutex;
};
void *tf_geng_main(void *restrict v_gen_args) {
for (size_t i = 0; i < 30; i++) {
// with a sleep here, it works as expected. Without it does weird things.
// usleep(1);
gen_output();
}
return NULL;
}
void *tf_consumer(void *restrict v_count_args) {
struct s_count_args *count_args = (struct s_count_args *)v_count_args;
int count, i, j;
pthread_mutex_unlock(&count_args->mutex);
while (1) {
pthread_mutex_lock(&count_args->mutex);
printf("consumer: waiting on data:n");
pthread_cond_wait(&count_args->condition, &count_args->mutex);
if (count_args->finsihed) {
pthread_mutex_unlock(&count_args->mutex);
return NULL;
}
printf("consumer: Thread got data now proccess data. id: %dn",
count_args->id);
usleep(2); // At this point is the data proccessed
printf("consumer: Thread end proccess data and id:%dn", count_args->id);
pthread_mutex_unlock(&count_args->mutex);
}
}
int main(int argc, char *argv[]) {
int i;
if (pthread_mutex_init(&max_mutex, NULL)) {
perror("Failed to create max_mutex");
exit(EXIT_FAILURE);
}
if (pthread_mutex_init(&out_mutex, NULL)) {
perror("Failed to create out_mutex");
exit(EXIT_FAILURE);
}
pthread_t t_consumer[CONSUMER_THREAD_DOUNT];
// initialise all consumer threads and mutexes:
for (i = 0; i < CONSUMER_THREAD_DOUNT; i++) {
int ret = pthread_mutex_init(&count_args[i].mutex, NULL);
if (ret) {
perror("couldnt create mutexes");
exit(1);
}
ret = pthread_cond_init(&count_args[i].condition, NULL);
if (ret) {
perror("couldnt create condition");
exit(1);
}
pthread_mutex_lock(&count_args[i].mutex);
count_args[i].finsihed = 0;
count_args[i].id = i;
ret = pthread_create(&t_consumer[i], NULL, tf_consumer,
(void *)&count_args[i]);
if (ret) {
perror("couldnt create threads");
exit(1);
}
}
// initialise all producer threads and mutexes:
struct gen_args cmdlineargs[PRODUCER_THREAD_DOUNT];
pthread_t t_geng_main[PRODUCER_THREAD_DOUNT];
for (i = 0; i < PRODUCER_THREAD_DOUNT; i++) {
pthread_create(&t_geng_main[i], NULL, tf_geng_main,
(void *)&cmdlineargs[i]);
}
for (i = 0; i < PRODUCER_THREAD_DOUNT; i++) {
pthread_join(t_geng_main[i], NULL);
}
for (i = 0; i < CONSUMER_THREAD_DOUNT; i++) {
pthread_mutex_lock(&count_args[i].mutex);
count_args[i].finsihed = 1;
pthread_mutex_unlock(&count_args[i].mutex);
pthread_cond_signal(&count_args[i].condition);
pthread_join(t_consumer[i], NULL);
}
return EXIT_SUCCESS;
}
I expect that producer only take threads that are free but they take threads that are not free.
This output should not happen:
producer: Give Signal to consumer with id: 0
producer: Thread start data
producer: Give Signal to consumer with id: 0
producer: Thread start data
producer: Give Signal to consumer with id: 0
Huan Sohn is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.