epoll(7) reads (emphasis is mine):
If multiple threads (or processes, if child processes have
inherited the epoll file descriptor across fork(2)) are blocked
in epoll_wait(2) waiting on the same epoll file descriptor and a
file descriptor in the interest list that is marked for edge-triggered (EPOLLET) notification becomes ready, just one of the
threads (or processes) is awoken from epoll_wait(2). This
provides a useful optimization for avoiding “thundering herd”
wake-ups in some scenarios.
Would you be so kind as to tell me whether I have understood it correctly that it should only apply in case of a globally shared epoll fd
? This should in turn imply that in case of a separate epoll fd for each thread
this would not hold. Is it so?
Thanks!
Below are two sample programs with 1 event and 10 threads: one features a global fd while the other a separate epoll fd for each thread. The former seems to behave in exactly the same way the epoll(7) man page specifies, while the latter does not (see how many threads are woken up on just that one event – indicated by WOKEN UP -[...]
).
epoll_global_fd:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <string.h>
#define NUM_THREADS 10
#define NUM_EVENTS 1
int p[2];
int ep_src_fd;
pthread_t threads[NUM_THREADS];
int event_count[NUM_THREADS];
struct epoll_event evt = {.events = EPOLLIN};
void die(const char *msg) {
perror(msg);
exit(-1);
}
void *run_func(void *ptr) {
int i = 0;
int j = 0;
int ret;
char buf[4];
int id = *(int *)ptr;
int *contents;
while (1) {
ret = epoll_wait(ep_src_fd, &evt, 10000, -1);
printf("WOKEN UP - id %d, ret %dn", id, ret);
fflush(stdout);
ret = read(p[0], buf, sizeof(int));
if (ret == 4) {
event_count[id]++;
}
}
}
int main(int argc, char *argv[]) {
int ret, i, j;
int id[NUM_THREADS];
int total = 0;
int nohit = 0;
int extra_wakeups = 0;
if (pipe(p) < 0) {
die("pipe");
}
if ((ep_src_fd = epoll_create(1)) < 0) {
die("create");
}
evt.events |= EPOLLET;
ret = epoll_ctl(ep_src_fd, EPOLL_CTL_ADD, p[0], &evt);
if (ret) {
perror("epoll_ctl add core error!n");
}
for (i = 0; i < NUM_THREADS; ++i) {
id[i] = i;
pthread_create(&threads[i], NULL, run_func, &id[i]);
}
for (j = 0; j < NUM_EVENTS; ++j) {
write(p[1], p, sizeof(int));
usleep(100);
}
for (i = 0; i < NUM_THREADS; ++i) {
pthread_cancel(threads[i]);
printf("joined: %dn", i);
printf("event count: %dn", event_count[i]);
total += event_count[i];
if (!event_count[i]) {
nohit++;
}
}
printf("total events is: %dn", total);
printf("nohit is: %dn", nohit);
}
epoll_separate_fd
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#define NUM_THREADS 10
#define NUM_EVENTS 1
int p[2];
pthread_t threads[NUM_THREADS];
int event_count[NUM_THREADS];
struct epoll_event evt = {.events = EPOLLIN};
void die(const char *msg) {
perror(msg);
exit(-1);
}
void *run_func(void *ptr) {
int i = 0;
int j = 0;
int ret;
int epfd;
char buf[4];
int id = *(int *)ptr;
int *contents;
if ((epfd = epoll_create(1)) < 0) {
die("create");
}
evt.events |= EPOLLET;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, p[0], &evt);
if (ret) {
perror("epoll_ctl add error!n");
}
while (1) {
ret = epoll_wait(epfd, &evt, 10000, -1);
printf("WOKEN UP - id %d, ret %dn", id, ret);
fflush(stdout);
ret = read(p[0], buf, sizeof(int));
if (ret == 4) {
event_count[id]++;
}
}
}
int main(int argc, char *argv[]) {
int ret, i, j;
int id[NUM_THREADS];
int total = 0;
int nohit = 0;
int extra_wakeups = 0;
if (pipe(p) < 0) {
die("pipe");
}
for (i = 0; i < NUM_THREADS; ++i) {
id[i] = i;
pthread_create(&threads[i], NULL, run_func, &id[i]);
}
for (j = 0; j < NUM_EVENTS; ++j) {
write(p[1], p, sizeof(int));
usleep(100);
}
for (i = 0; i < NUM_THREADS; ++i) {
pthread_cancel(threads[i]);
printf("joined: %dn", i);
printf("event count: %dn", event_count[i]);
total += event_count[i];
if (!event_count[i]) nohit++;
}
printf("total events is: %dn", total);
printf("nohit is: %dn", nohit);
}