I have a class that has something like the following context manager to create a dask client & cluster:
class some_class():
def __init__(self,engine_kwargs: dict = None):
self.distributed = engine_kwargs.get("distributed", False)
self.dask_client = None
self.n_workers = engine_kwargs.get(
"n_workers", int(os.getenv("SLURM_CPUS_PER_TASK", os.cpu_count()))
)
@contextmanager
def dask_context(self):
"""Dask context manager to set up and close down client"""
if self.distributed:
if self.distributed_mode == "processes":
processes = True
self.dask_cluster = LocalCluster(n_workers=self.n_workers, processes=processes)
self.dask_client = Client(self.dask_cluster)
try:
yield
finally:
if self.dask_client is not None:
self.dask_client.close()
self.local_cluster.close()
In the class I have a method that uses delayed, with the intention of distibuting the work across the cluster.
def some_class_method(
self,
):
min_ind = segy_container.trace_headers["SOME_GROUPER"].values.min()
max_ind = segy_container.trace_headers["SOME_GROUPER"].values.max()
tasks = [
delayed(self._process_group)(index,some,other,method,arguments,here
)
for index in range(min_ind, max_ind + 1)
]
with ProgressBar():
with self.dask_context():
results = compute(*tasks, scheduler=self.dask_client) #scheduler="processes")
In the last line if I try to use the dask_client which is set up by the context manager I get the following error:
TypeError: cannot pickle '_asyncio.Task' object
If i get rid of the context manager, and use scheduler="processes"
it works fine.
I assume the secheduler=”processes” doesn’t serialise the tasks, so it proceeds ok ,but when trying to use the local cluster it does and throws the error.
Is it possible to use the local cluster with delayed and compute in some way, or is there another approach to solving the problem?