I’m working on a project about pageranking. All the calculations on this function are correct:
double *pagerank(grafo *g, double d, double eps, int maxiter, int taux, int *numiter){
//Contributo teleporting
double tp = (1.0 - d) / g->N;
double x[g->N];
double xnext[g->N];
double y[g->N];
double err = eps;
for(int i = 0; i < g->N; i++){
x[i] = 1.0 / g->N;
}
while(err >= eps && *numiter < maxiter){
//Calcolo xnext
//Contributo nodi dead end
double st = 0.0;
for(int i = 0; i < g->N; i++){
if(g->out[i] == 0){
st += x[i];
y[i] = 0.0;
} else{
y[i] = x[i] / g->out[i];
}
}
//Calcolo componente j-esima
for(int j = 0; j < g->N; j++){
double sum_in = 0.0;
for(int i = 0; i < g->in[j].messi; i++){
sum_in += y[g->in[j].a[i]];
}
xnext[j] = tp + d * sum_in + (d / g->N) * st;
}
//Calcolo errore
err = 0.0;
for(int i = 0; i < g->N; i++){
err += fabs(xnext[i] - x[i]);
}
*numiter += 1;
//copio xnext in x
for(int i = 0; i < g->N; i++){
x[i] = xnext[i];
}
}
double *solution = malloc(sizeof(double) * g->N);
if(solution == NULL) termina("Errore malloc solution");
for(int i = 0; i < g->N; i++){
solution[i] = x[i];
}
return solution;
}
Now what I have to do is to parallelize the calculations. I have these constraints:
- All the threads must be active for the whole calculations;
- Every thread should contribute to the calculation of the vector at time t;
- the division of labor is dynamic: the calculation of the individual X components
can take very different times, so as soon as a thread has finished a computation must
be able to start a new one; - The threads join is allowed only at the end of the last computation.
This is what I came up with:
void *calcolo(void *arg){
// Casting del parametro
cdati *dati = (cdati *)arg;
int componente;
while (true) {
xpthread_mutex_lock(dati->mut, QUI);
while (*dati->terminati != dati->nthreads) {
xpthread_cond_wait(dati->cond, dati->mut, QUI);
}
if (*dati->fine) {
xpthread_mutex_unlock(dati->mut, QUI);
break;
}
xpthread_mutex_unlock(dati->mut, QUI);
// Calcolo componente j-esima
xpthread_mutex_lock(dati->mut2, QUI);
componente = *(dati->index);
(*(dati->index))++;
xpthread_mutex_unlock(dati->mut2, QUI);
// Terminazione calcolo vettore al tempo t
if (componente >= dati->g->N) {
// Calcolo errore, y, e St per la successiva iterazione
double newst = 0.0;
double newy[dati->g->N];
for (int i = dati->start; i < dati->end; i++) {
if (dati->g->out[i] == 0) {
newst += dati->x[i];
newy[i] = 0.0;
} else {
newy[i] = dati->x[i] / dati->g->out[i];
}
}
double err = 0.0;
for (int i = dati->start; i < dati->end; i++) {
err += fabs(dati->xnext[i] - dati->x[i]);
}
// Aggiornamento dati
// xpthread_mutex_lock(dati->mut2, QUI);
*(dati->st) += newst;
*(dati->err) += err;
for (int i = dati->start; i < dati->end; i++) {
dati->y[i] = newy[i];
dati->x[i] = dati->xnext[i];
}
// xpthread_mutex_unlock(dati->mut2, QUI);
// Segnale terminazione
xpthread_mutex_lock(dati->mut, QUI);
(*(dati->terminati))--;
xpthread_cond_signal(dati->cond, QUI);
xpthread_mutex_unlock(dati->mut, QUI);
continue;
}
double sum_in = 0.0;
for (int i = 0; i < dati->g->in[componente].messi; i++) {
sum_in += dati->y[dati->g->in[componente].a[i]];
}
dati->xnext[componente] = dati->tp + dati->d * sum_in + (dati->d / dati->g->N) * *(dati->st);
}
pthread_exit(NULL);
}
double *pagerank(grafo *g, double d, double eps, int maxiter, int taux, int *numiter){
//Inizializzazione dati
cdati p[taux];
pthread_t t[taux];
pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mut2 = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
int terminati = taux;
int index = 0;
bool fine = false;
//Contributo teleporting
double tp = (1.0 - d) / g->N;
double x[g->N];
double xnext[g->N];
double y[g->N];
double err = eps;
for(int i = 0; i < g->N; i++){
x[i] = 1.0 / g->N;
}
//Contributo nodi dead end prima iterazione
double st = 0.0;
for(int i = 0; i < g->N; i++){
if(g->out[i] == 0){
st += x[i];
y[i] = 0.0;
} else{
y[i] = x[i] / g->out[i];
}
}
xpthread_mutex_lock(&mut, QUI);
//Creazione threads ausiliari
for(int j = 0; j < taux; j++){
int n = g->N / taux;
p[j].start = n*j;
p[j].end = (j == taux-1) ? g->N : n *(j+1);
p[j].index = &index;
p[j].fine = &fine;
p[j].nthreads = taux;
p[j].x = x;
p[j].y = y;
p[j].xnext = xnext;
p[j].tp = tp;
p[j].err = &err;
p[j].st = &st;
p[j].mut = &mut;
p[j].mut2 = &mut2;
p[j].cond = &c;
p[j].terminati = &terminati;
p[j].g = g;
p[j].d = d;
xpthread_create(&t[j], NULL, &calcolo, &p[j], QUI);
}
while(err >= eps && *numiter < maxiter){
terminati = taux;
xpthread_cond_broadcast(&c, QUI);
//Attesa terminazione threads
while(terminati > 0){
xpthread_cond_wait(&c, &mut, QUI);
}
st = 0.0;
err = 0.0;
for(int i = 0; i < taux; i++){
st += *p[i].st;
err += *p[i].err;
for(int j = p[i].start; j < p[i].end; j++){
y[j] = p[i].y[j];
}
}
for(int i = 0; i < taux; i++){
p[i].y = y;
p[i].st = &st;
p[i].err = &err;
}
index = 0;
*numiter += 1;
}
fine = true;
terminati = taux;
xpthread_cond_broadcast(&c, QUI);
xpthread_mutex_unlock(&mut, QUI);
//Terminazione threads
for(int i = 0; i < taux; i++){
xpthread_join(t[i], NULL, QUI);
}
xpthread_mutex_destroy(&mut, QUI);
xpthread_mutex_destroy(&mut2, QUI);
xpthread_cond_destroy(&c, QUI);
double *solution = malloc(sizeof(double) * g->N);
if(solution == NULL) termina("Errore malloc solution");
for(int i = 0; i < g->N; i++){
solution[i] = x[i];
}
return solution;
}
The problem is that sometimes my program enters in a starvation phase. (The calculations of my parallelized solution may be wrong).
Where am I using synchronization wrong?