I am trying to understand the difference between compute function
vs compute method
and Client
.
I understand that dask.compute()
operates on multiple dask collections at once (see here), but what I want to ask is whether in that case the operations happen in a pool of threads. I imagine so since intermediate results are shared.
I have some code that runs fine on my local pc. I dont see any new python processes getting spawned, I do see however a spike in one of my cpu cores, it reaches 100% sometimes the load is split in more than one CPUs.
Is there a way to use multiple python processes with dask.compute
?
I used a very simple example (taken from here, it is the function fun_1
in the code below) to experiment with Client so I can manuall create processes and threads. For that I wrote fun_2
but I am must be doing something really wrond because execution time is a lot slower now. When the input to these functions is a pair of arrays with length 100 then, on my pc, the execution time from 0.85secs for fun_a
jumps to 14.71secs for fun_2
.
Also, when the input array A is array of length 100 abd B has length 1000 I get the UserWarning: Sending large graph of size ....
Could someone help me understand what I am doing wrong with fun_2
and how I should change it please
Finally, it is really strange that the vanila python function (without Dask) named
fun_3
is always the fastest by a very big margin! Is this because the case here is so simple that dask doesnt really help?
I have a more complicated and heavier case and I would like to use dask to run it on my local pc but also on a multinode compute cluster facility and I try to understand how I should do my code to make the best use out of dask. Any comments will be greatly appreciated.
import dask
import dask.array as da
from dask.distributed import Client, progress
import numpy as np
import time
def f(x, y):
return min(x, y)
def g(x, y):
return x + y
def fun_1(A, B):
lazy_results = []
for a in A:
for b in B:
if a < b:
c = dask.delayed(f)(a, b) # add lazy task
else:
c = dask.delayed(g)(a, b) # add lazy task
lazy_results.append(c)
results = dask.compute(*lazy_results)
print(sum(results))
def fun_2(A, B):
lazy_results = []
for a in A:
for b in B:
if a < b:
c = dask.delayed(f)(a, b) # add lazy task
else:
c = dask.delayed(g)(a, b) # add lazy task
c = da.from_delayed(c, shape=(), dtype=np.float64)
lazy_results.append(c)
client = Client(threads_per_worker=8, n_workers=8)
results = da.block(lazy_results).compute()
print(sum(results))
def fun_3(A, B):
"""
Simple sequential function. No dask involved here
"""
results = []
for a in A:
for b in B:
if a < b:
c = f(a, b)
else:
c = g(a, b)
results.append(c)
print(sum(results))
if __name__ == "__main__":
np.random.seed(0)
A = np.random.random(100)
B = np.random.random(100)
functions = [fun_1, fun_2, fun_3]
for fun in functions:
tic = time.time()
fun(A, B)
toc = time.time()
print(f"{fun}: Computation time: {toc- tic:.2f} secondsn")