I’ve recently found a strange performance penalty in using dask that I was not actually expecting. The code below allows to reproduce it:
import multiprocessing
import time
import dask
from dask.distributed import Client
import numpy as np
def job(X):
# X = X.copy() # Uncommenting this line makes dask multiprocessing as fast as multiprocessing
for i in range(500):
beta = np.random.rand(X.shape[1])
np.matmul(X.T, X * np.sin(np.matmul(X, beta)).reshape((-1, 1)))
def time_fn(fn, tag):
start = time.time()
fn()
end = time.time()
print(tag, "took {:.2f} minutes".format((end-start)/60))
if __name__ == '__main__':
X = np.random.rand(500000, 11)
n_jobs = 4
# Dask multiprocessing
def fn():
with Client(processes=True):
jobs = [dask.delayed(job)(X) for _ in range(n_jobs)]
dask.compute(jobs)
time_fn(fn, "Dask multiprocessing")
# Dask multithreading
def fn():
with Client(processes=False, n_workers=1):
jobs = [dask.delayed(job)(X) for _ in range(n_jobs)]
dask.compute(jobs)
time_fn(fn, "Dask multithreading")
# Python multiprocessing
def fn():
with multiprocessing.Pool() as p:
p.map(job, [X] * n_jobs)
time_fn(fn, "Python multiprocessing")
The output I get is the following:
<path> UserWarning: Sending large graph of size 41.96 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
warnings.warn(
Dask multiprocessing took 0.86 minutes
Dask multithreading took 0.46 minutes
Python multiprocessing took 0.30 minutes
Dask appears to be significantly slower than plain Python multiprocessing. However, if the line X = X.copy()
is uncommented, Dask performs as fast as Python multiprocessing. Ok, the console gives a warning about possible slowdown, and this is why I’m here to ask if this behavior is expected or not. To be honest I was not expecting it: I would guess that the array X is serialized, sent to the workers, deserialized and then used within the job function. So the overhead (to which the warning I assume refers to) would consists in one serialization and deserialization operations that, being executed through localhost connections, should be very fast and cannot explain the measured overhead. Moreover, such overhead should not change when using X = X.copy()
. However, this simple example seems to suggest that the worker keeps communicating back and forth with the scheduler every time it needs to access the data.
So what I would like to ask is whether the behavior (overhead) is expected and my understanding of how Dask works is wrong, or not.