I’m building a multi-threaded server in C using epoll, non-blocking sockets, and a custom thread pool. The server is supposed to handle multiple client connections concurrently. However, when I run two instances of a client application simultaneously, the server crashes.
Server Code (C):
This is the key part of my server code, where I handle incoming connections and delegate work to threads:
void esperar_peticion(int listenSock) {
int clientSock, returnCode;
struct epoll_event events[BACKLOG];
while (1) {
int cantFds = epoll_wait(epfd, events, BACKLOG, -1);
if (cantFds < 0) quit("epoll_wait");
for (int i = 0; i < cantFds; i++) {
if (events[i].data.fd == listenSock) {
clientSock = accept(listenSock, NULL, NULL);
if (clientSock < 0)
quit("accept");
else
printf("Connection establishedn");
struct epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = clientSock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, clientSock, &event) < 0) quit("epoll_ctl");
} else {
int fd = events[i].data.fd;
struct epoll_event event;
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event);
int *fd_ptr = malloc(sizeof(int));
if (fd_ptr == NULL) {
perror("malloc");
continue;
}
*fd_ptr = fd;
pool_submit(&handle_client, fd_ptr);
}
}
}
}
void handle_client(void *param) {
int clientSock = *((int*) param);
free(param);
int cmd = 0;
if (readn(clientSock, &cmd, 1) < 0) {
close(clientSock);
return;
}
switch(cmd){
case PUT:
handle_put(clientSock);
break;
case GET:
handle_get(clientSock);
break;
case DEL:
handle_del(clientSock);
break;
case STATS:
handle_stats(clientSock);
break;
case MEMORY:
rlim_t res = get_memory_limit();
char res_str[32];
snprintf(res_str, sizeof(res_str), "%lu", (unsigned long)res);
send_var(clientSock, strlen(res_str), res_str);
break;
default:
int res1 = EINVAL;
writen(clientSock, &res1, 1);
break;
}
struct epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = clientSock;
pthread_mutex_lock(&lock);
if (epoll_ctl(epfd, EPOLL_CTL_ADD, clientSock, &event) < 0) {
close(clientSock);
pthread_mutex_unlock(&lock);
quit("epoll_ctl");
}
pthread_mutex_unlock(&lock);
}
Thread Pool Implementation:
Here’s the implementation of the thread pool that manages the worker threads:
typedef struct task_t {
void (*function)(void* arg);
void* data;
struct task_t* next;
} task;
task* task_head = NULL;
task* task_tail = NULL;
pthread_mutex_t pool_mutex;
sem_t task_sem;
pthread_t* pool_array;
int NUMBER_OF_THREADS;
void pool_init(void) {
NUMBER_OF_THREADS = (int) sysconf(_SC_NPROCESSORS_ONLN);
printf("Num hardware threads: %in", NUMBER_OF_THREADS);
pool_array = malloc(NUMBER_OF_THREADS * sizeof(pthread_t));
pthread_mutex_init(&pool_mutex, NULL);
sem_init(&task_sem, 0, 0);
for(int i=0; i < NUMBER_OF_THREADS; i++) {
pthread_create(&pool_array[i], NULL, wait_for_tasks, NULL);
}
}
int pool_submit(void (*funcion)(void* p), void* arg) {
pthread_mutex_lock(&pool_mutex);
enqueue(funcion, arg);
pthread_mutex_unlock(&pool_mutex);
sem_post(&task_sem);
return 0;
}
void* wait_for_tasks(void* param) {
while(1) {
sem_wait(&task_sem);
pthread_mutex_lock(&pool_mutex);
task* tarea = dequeue();
pthread_mutex_unlock(&pool_mutex);
if (tarea != NULL) {
printf("Command being processed by thread %lun", (unsigned long)pthread_self());
execute_task(tarea->function, tarea->data);
free(tarea);
}
}
}
My Theories:
Race Condition: I suspect there might be a race condition where two threads try to handle the same socket or share data incorrectly.
Task Loss: Tasks might be getting lost in the thread pool, causing some connections to hang or crash.
What I’ve Tried:
- I’ve ensured that each client connection is handled by a separate thread.
- Added mutex locks around critical sections, especially where the socket is accepted and where tasks are enqueued and dequeued.
- The issue seems to occur specifically when running two instances of a client test application simultaneously. Below is the code for the test.c program that triggers the server crash when run concurrently:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
// Cantidad de operaciones PUT a ejecutarse
// del tipo PUT clavei valor i
#define MAX_ENTRIES 10000
void test() {
int pipe_fd[2];
if (pipe(pipe_fd) == -1) {
perror("pipe");
exit(EXIT_FAILURE);
}
pid_t pid = fork();
if (pid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
if (pid == 0) {
close(pipe_fd[1]);
// hacemos que stdin lea el pipe
// ya que client.out toma comandos desde stdin
dup2(pipe_fd[0], STDIN_FILENO);
close(pipe_fd[0]);
// ejecutamos ./client.out
execl("./client.out", "./client.out", NULL);
perror("execl");
exit(EXIT_FAILURE);
} else {
close(pipe_fd[0]);
FILE *pipe_write = fdopen(pipe_fd[1], "w");
if (!pipe_write) {
perror("fdopen");
exit(EXIT_FAILURE);
}
// Manda los comandos PUT
for (int i = 0; i < MAX_ENTRIES; ++i) {
fprintf(pipe_write, "PUT key%d value%dn", i, i);
fflush(pipe_write);
}
fclose(pipe_write);
int status;
waitpid(pid, &status, 0); //
if (WIFEXITED(status)) {
if (WEXITSTATUS(status) != 0) {
fprintf(stderr, "client.out fallo con %dn", WEXITSTATUS(status));
exit(EXIT_FAILURE);
}
} else {
fprintf(stderr, "client.out no termino como deben");
exit(EXIT_FAILURE);
}
}
}
int main() {
printf("Ejecutando operaciones PUT para llenar la cache..n");
test();
printf("Se realizaron las operaciones PUT.n");
return 0;
}
Ignacio Rimini is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
2