Hello I have this problem, I have an MPI program in c that works perfectly fine, but the thing is, I believe it could be improved since the MPI_Allgather function makes everynode wait for the slowest one, which I think could be imporved, here’s the code:
void update_global_cache(Diccionario *global_cache, Diccionario *new_entries) {
merge_dictionaries(global_cache, new_entries);
}
int main(int argc, char *argv[]) {
int rank, size;
double start_time, end_time;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
start_time = MPI_Wtime(); // Start timer
Point *points = NULL;
int num_points = 0, dimensions = 0;
Diccionario *global_cache = create_table();
load_data("ciudades_a_analizar.csv", &points, &num_points, &dimensions);
printf("Process %d: Data loaded: %d points with %d dimensions each.n", rank, num_points, dimensions);
int bloque_trabajo = num_points / size;
int restante = num_points % size;
int work_done = rank * bloque_trabajo + (rank < restante ? rank : restante);
int end = work_done + bloque_trabajo - 1 + (rank < restante ? 1 : 0);
struct timespec start_time_work, end_time_work;
clock_gettime(CLOCK_MONOTONIC, &start_time_work);
for (int i = work_done; i <= end; i++) {
struct timespec iteration_start, iteration_end;
clock_gettime(CLOCK_MONOTONIC, &iteration_start);
printf("Nodo %d: estoy en: %d, final: %dn", rank, i, end);
Diccionario *new_cache = create_table();
int resultado = KNN(points[i], global_cache, new_cache);
printf("Nodo %d: termino mi knnn", rank);
char *serialized_dict = serialize_dict(new_cache);
int local_serialized_size = calculate_serialized_size(new_cache);
if (rank == MASTER_NODE) {
int *serialized_sizes = (int *)malloc(size * sizeof(int));
MPI_Gather(&local_serialized_size, 1, MPI_INT, serialized_sizes, 1, MPI_INT, MASTER_NODE, MPI_COMM_WORLD);
int *displs = (int *)malloc(size * sizeof(int));
displs[0] = 0;
for (int j = 1; j < size; j++) {
displs[j] = displs[j - 1] + serialized_sizes[j - 1];
}
int total_size = displs[size - 1] + serialized_sizes[size - 1];
char *all_serialized_dicts = (char *)malloc(total_size);
MPI_Gatherv(serialized_dict, local_serialized_size, MPI_BYTE, all_serialized_dicts, serialized_sizes, displs, MPI_BYTE, MASTER_NODE, MPI_COMM_WORLD);
// Procesar todos los diccionarios en el nodo maestro
Diccionario *final = create_table();
for (int j = 0; j < size; j++) {
char *buffer = all_serialized_dicts + displs[j];
Diccionario *dict = deserialize_dict(buffer, serialized_sizes[j]);
merge_dictionaries(final, dict);
free_table(dict);
}
// Enviar el diccionario global actualizado a todos los nodos
char *global_serialized_dict = serialize_dict(final);
int global_serialized_size = calculate_serialized_size(final);
MPI_Bcast(&global_serialized_size, 1, MPI_INT, MASTER_NODE, MPI_COMM_WORLD);
MPI_Bcast(global_serialized_dict, global_serialized_size, MPI_BYTE, MASTER_NODE, MPI_COMM_WORLD);
free(all_serialized_dicts);
free(serialized_sizes);
free(displs);
free(global_serialized_dict);
free_table(final);
} else {
// Enviar el diccionario local al nodo maestro
MPI_Gather(&local_serialized_size, 1, MPI_INT, NULL, 1, MPI_INT, MASTER_NODE, MPI_COMM_WORLD);
MPI_Gatherv(serialized_dict, local_serialized_size, MPI_BYTE, NULL, NULL, NULL, MPI_BYTE, MASTER_NODE, MPI_COMM_WORLD);
// Esperar a recibir el diccionario global actualizado
int global_serialized_size;
MPI_Bcast(&global_serialized_size, 1, MPI_INT, MASTER_NODE, MPI_COMM_WORLD);
char *global_serialized_dict = (char *)malloc(global_serialized_size);
MPI_Bcast(global_serialized_dict, global_serialized_size, MPI_BYTE, MASTER_NODE, MPI_COMM_WORLD);
Diccionario *final = deserialize_dict(global_serialized_dict, global_serialized_size);
merge_dictionaries(global_cache, final);
free(global_serialized_dict);
free_table(final);
}
// Liberar memoria
free(serialized_dict);
free_table(new_cache);
clock_gettime(CLOCK_MONOTONIC, &iteration_end);
double time_spent_iteration = (iteration_end.tv_sec - iteration_start.tv_sec) +
(iteration_end.tv_nsec - iteration_start.tv_nsec) / 1e9;
printf("Process %d: Time for iteration %d: %f secondsn", rank, i, time_spent_iteration);
}
clock_gettime(CLOCK_MONOTONIC, &end_time_work);
double time_spent_work = (end_time_work.tv_sec - start_time_work.tv_sec) +
(end_time_work.tv_nsec - start_time_work.tv_nsec) / 1e9;
if (rank == MASTER_NODE) {
end_time = MPI_Wtime(); // End timer
double time_spent_total = end_time - start_time;
printf("Process 0: Total computation time: %f secondsn", time_spent_total);
printf("Process 0: Total work time: %f secondsn", time_spent_work);
}
free(global_cache);
MPI_Finalize();
return 0;
}
it's a lot of lines but it basically loads a csv into an array (each node loads it's own matrix, I know I can load the csv in one node and then share it with Bcast but I had tons of problems trying to do that so I left it this way), and then splits the array into equal parts for each node, then, each node excecutes knn wich is a program defined in knn.c (doesn't really mather in this problem) and then they return a new_cache with the results of the calculation done in knn so in case I need to re-do those calculations, I already have the result in the cache and avoid redundancy, each node must share their new_cache findings with every other node to make it more efficient, the thing is, each node has to wait in the MPI_Allgather function for the rest of the nodes, and knn is a very demanding function (about 60 seconds, it's made on purpose), making the nodes wait for the slowest one doesn't seem like a good strategy so I'm trying to find a way to make it asyncronous, if someone has an idea on how to implementate this on a different manner I'm glad to hear it, it's my first MPI project so I'm not really an expert with MPI, thanks for the help.
Tried to implement a master node that recollects the dictionaries of the nodes asyncronicly and then Bcasts them but it didn’t really work, I still don’t know why, as I said before, I’m still new to this library so I’m no expert might be easier than I think but I’m really having a hard time with this.
Sebastian Piceda is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.