I’m working on a general purpose threadpool library, and I’m running into an issue where my code is unexpectedly ending with a segmentation fault. I’ve tried debugging it, but I’m still not sure what’s causing the problem.
#include "common.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
typedef struct {
pthread_cond_t jobqueue_wait_cond;
pthread_mutex_t jobqueue_wait_mutex;
} jobqueue_sync_t;
typedef struct {
void *arg;
struct job_t *next;
void *(*thread_func)(void *arg);
} job_t;
typedef struct {
job_t *front;
job_t *rear;
int len;
pthread_mutex_t jobqueue_mutex;
jobqueue_sync_t *jobqueue_sync;
} jobqueue_t;
typedef struct {
pthread_t *thread;
volatile int num_threads_alive;
volatile int num_thread_working;
volatile int keep_threadpool_alive;
pthread_cond_t threadpool_thread_idle_cond;
pthread_mutex_t threadpool_thread_count_mutex;
jobqueue_t *jobqueue;
} threadpool_t;
/* ======================== FUNCTIONS ========================= */
// threadpool
threadpool_t *threadpool_t_init(int thread_num);
void threadpool_add_work(const threadpool_t *threadpool,void *(*worker_func)(void *arg),void *arg);
void threadpool_wait(threadpool_t *threadpool);
void threadpool_destroy(threadpool_t *threadpool);
int get_thread_alive(threadpool_t *threadpool);
// THREAD
static void thread_init(pthread_t *thread, threadpool_t *threadpool);
static void *thread_do(void *arg);
// jobqueue
static jobqueue_sync_t* jobqueue_sync_t_init();
static void jobqueue_sync_t_deinit(jobqueue_sync_t *jobqueue_sync);
static jobqueue_t* jobqueue_t_init();
static void jobqueue_t_deinit(jobqueue_t *jobqueue);
static void jobqueue_clear(jobqueue_t *jobqueue);
static job_t *jobqueue_pull(jobqueue_t *jobqueue);
void jobqueue_push(jobqueue_t *jobqueue, job_t *newjob);
// sync
static void jobqueue_sync_post(jobqueue_sync_t *jobqueue_sync);
static void jobqueue_sync_post_all(jobqueue_sync_t *jobqueue_sync);
static void jobqueue_sync_wait(jobqueue_sync_t *jobqueue_sync);
/* ======================== THREADPOOL ========================= */
// threadpool init
threadpool_t *threadpool_t_init(int thread_num) {
threadpool_t *threadpool = calloc(1, sizeof(threadpool_t));
CHECK(threadpool != NULL, "threadpool calloc");
CHECK(pthread_mutex_init(&threadpool->threadpool_thread_count_mutex, NULL) == 0, "threadpool_t mutex init");
CHECK(pthread_cond_init(&threadpool->threadpool_thread_idle_cond, NULL) == 0, "threadpool_t cond init");
threadpool->num_thread_working = 0;
threadpool->num_threads_alive = 0;
threadpool->keep_threadpool_alive = 1;
threadpool->thread = calloc(thread_num, sizeof(pthread_t));
CHECK(threadpool->thread, "threadpool_t thread init");
threadpool->jobqueue = jobqueue_t_init();
for (int i = 0; i < thread_num; i++) {
thread_init(&threadpool->thread[i], threadpool);
}
while (threadpool->num_threads_alive != thread_num) {}
return threadpool;
}
void threadpool_add_work(const threadpool_t *threadpool,void *(*worker_func)(void *arg),void *arg){
job_t *newjob = malloc(sizeof(job_t));
CHECK(newjob!=NULL,"newjob malloc");
newjob->thread_func = worker_func;
newjob->arg = arg;
newjob->next = NULL;
jobqueue_push(threadpool->jobqueue, newjob);
}
void threadpool_wait(threadpool_t *threadpool){
pthread_mutex_lock(&threadpool->threadpool_thread_count_mutex);
while(threadpool->jobqueue->len || threadpool->num_thread_working){
pthread_cond_wait(&threadpool->threadpool_thread_idle_cond,&threadpool->threadpool_thread_count_mutex);
}
pthread_mutex_unlock(&threadpool->threadpool_thread_count_mutex);
}
void threadpool_destroy(threadpool_t *threadpool){
if(threadpool==NULL){
return;
}
threadpool->keep_threadpool_alive = 0;
double timeout = 1.0;
time_t start,end;
double time_passed = 0.0;
time(&start);
while(time_passed<timeout && threadpool->num_threads_alive){
jobqueue_sync_post_all(threadpool->jobqueue->jobqueue_sync);
time(&end);
time_passed = difftime(end, start);
}
while(threadpool->num_threads_alive){
jobqueue_sync_post_all(threadpool->jobqueue->jobqueue_sync);
sleep(1);
}
jobqueue_t_deinit(threadpool->jobqueue);
free(threadpool->thread);
free(threadpool);
}
int get_thread_alive(threadpool_t *threadpool){
return threadpool->num_threads_alive;
}
/* ======================== THREAD ========================= */
// thread init
static void thread_init(pthread_t *thread, threadpool_t *threadpool) {
CHECK(pthread_create(thread, NULL, (void *(*)(void *))thread_do, threadpool) == 0, "new thread create");
CHECK(pthread_detach(*thread) == 0, "new thread detach");
}
// thread_do
static void *thread_do(void *arg) {
threadpool_t *threadpool = (threadpool_t *)arg;
pthread_mutex_lock(&threadpool->threadpool_thread_count_mutex);
threadpool->num_threads_alive++;
pthread_mutex_unlock(&threadpool->threadpool_thread_count_mutex);
while (threadpool->keep_threadpool_alive) {
job_t *job;
while ((job = jobqueue_pull(threadpool->jobqueue)) == NULL && threadpool->keep_threadpool_alive) {
jobqueue_sync_wait(threadpool->jobqueue->jobqueue_sync);
}
if (threadpool->keep_threadpool_alive) {
pthread_mutex_lock(&threadpool->threadpool_thread_count_mutex);
threadpool->num_thread_working++;
pthread_mutex_unlock(&threadpool->threadpool_thread_count_mutex);
job->thread_func(job->arg);
free(job);
pthread_mutex_lock(&threadpool->threadpool_thread_count_mutex);
threadpool->num_thread_working--;
if (!threadpool->num_thread_working) { pthread_cond_signal(&threadpool->threadpool_thread_idle_cond); }
pthread_mutex_unlock(&threadpool->threadpool_thread_count_mutex);
}
}
pthread_mutex_lock(&threadpool->threadpool_thread_count_mutex);
threadpool->num_threads_alive--;
pthread_mutex_unlock(&threadpool->threadpool_thread_count_mutex);
return NULL;
}
/* ======================== JOBQUEUE ========================= */
// jobqueue wait init
static jobqueue_sync_t* jobqueue_sync_t_init() {
jobqueue_sync_t *jobqueue_sync = malloc(sizeof(jobqueue_sync_t));
CHECK(jobqueue_sync!=NULL,"jobqueue sync malloc");
CHECK(pthread_cond_init(&jobqueue_sync->jobqueue_wait_cond, NULL) == 0, "job_queue_wait_sync_t cond init");
CHECK(pthread_mutex_init(&jobqueue_sync->jobqueue_wait_mutex, NULL) == 0, "job_queue_wait_sync_t mutex init");
return jobqueue_sync;
}
// jobqueue wait deinit
static void jobqueue_sync_t_deinit(jobqueue_sync_t *jobqueue_sync) {
CHECK(pthread_cond_destroy(&jobqueue_sync->jobqueue_wait_cond) == 0, "job_queue_wait_sync_t cond deinit");
CHECK(pthread_mutex_destroy(&jobqueue_sync->jobqueue_wait_mutex) == 0, "job_queue_wait_sync_t mutex deinit");
free(jobqueue_sync);
}
// job queue init
static jobqueue_t* jobqueue_t_init() {
jobqueue_t *jobqueue = calloc(1, sizeof(jobqueue_t));
CHECK(jobqueue != NULL, "jobqueue calloc");
jobqueue->front = NULL;
jobqueue->rear = NULL;
jobqueue->len = 0;
CHECK(pthread_mutex_init(&jobqueue->jobqueue_mutex, NULL) == 0, "jobqueue_t mutex init");
jobqueue->jobqueue_sync = jobqueue_sync_t_init();
return jobqueue;
}
// jobqueue deinit
static void jobqueue_t_deinit(jobqueue_t *jobqueue) {
jobqueue_sync_t_deinit(jobqueue->jobqueue_sync);
jobqueue_clear(jobqueue);
CHECK(pthread_mutex_destroy(&jobqueue->jobqueue_mutex) == 0, "jobqueu_t mutex deinit");
free(jobqueue);
}
// jobqueue pull
static job_t *jobqueue_pull(jobqueue_t *jobqueue) {
pthread_mutex_lock(&jobqueue->jobqueue_mutex);
job_t *job = jobqueue->front;
switch (jobqueue->len) {
case 0:
return NULL;
break;
case 1:
jobqueue->front = NULL;
jobqueue->rear = NULL;
jobqueue->len = 0;
break;
default:
jobqueue->front = (job_t *)job->next;
jobqueue->len--;
jobqueue_sync_post_all(jobqueue->jobqueue_sync);
}
pthread_mutex_unlock(&jobqueue->jobqueue_mutex);
return job;
}
// jobqueue push
void jobqueue_push(jobqueue_t *jobqueue, job_t *newjob) {
pthread_mutex_lock(&jobqueue->jobqueue_mutex);
switch (jobqueue->len) {
case 0:
jobqueue->front = newjob;
jobqueue->rear = newjob;
break;
default:
jobqueue->rear->next = (struct job_t *)newjob;
jobqueue->rear = newjob;
}
jobqueue->len++;
jobqueue_sync_post_all(jobqueue->jobqueue_sync);
pthread_mutex_unlock(&jobqueue->jobqueue_mutex);
}
// jobqueue clear
static void jobqueue_clear(jobqueue_t *jobqueue) {
while (jobqueue->len) {
free(jobqueue_pull(jobqueue));
}
jobqueue->front = NULL;
jobqueue->rear = NULL;
jobqueue->len = 0;
}
/* ======================== SYNCHRONISATION ========================= */
static void jobqueue_sync_post(jobqueue_sync_t *jobqueue_sync) {
pthread_mutex_lock(&jobqueue_sync->jobqueue_wait_mutex);
pthread_cond_signal(&jobqueue_sync->jobqueue_wait_cond);
pthread_mutex_unlock(&jobqueue_sync->jobqueue_wait_mutex);
}
static void jobqueue_sync_post_all(jobqueue_sync_t *jobqueue_sync) {
pthread_mutex_lock(&jobqueue_sync->jobqueue_wait_mutex);
pthread_cond_broadcast(&jobqueue_sync->jobqueue_wait_cond);
pthread_mutex_unlock(&jobqueue_sync->jobqueue_wait_mutex);
}
static void jobqueue_sync_wait(jobqueue_sync_t *jobqueue_sync) {
pthread_mutex_lock(&jobqueue_sync->jobqueue_wait_mutex);
pthread_cond_wait(&jobqueue_sync->jobqueue_wait_cond, NULL);
pthread_mutex_unlock(&jobqueue_sync->jobqueue_wait_mutex);
}
void *thread_func(void *arg){
printf("here");
return NULL;
}
int main(){
setbuf(stdout, 0);
printf("1");
threadpool_t *threadpool= threadpool_t_init(1);
printf("2");
for(int i=0;i<5;i++){
printf("3");
threadpool_add_work(threadpool, thread_func, NULL);
}
printf("4");
threadpool_wait(threadpool);
printf("5");
threadpool_destroy(threadpool);
printf("6");
}
i have tried debugging it with clion, it works perfectly sometimes but errors out in some. could you help me in debugging this program.
any help would be appreciated.