I am trying to use multiprocessing
to optimize some calculations. The code to start it looks something like this:
if __name__ == "__main__":
params = list(itertools.product(cutoff_hours, cutoff_minutes, chunk_sizes, buffer_sizes, products, optimize_ons))
params_dilled = [dill.dumps(param) for param in params]
with multiprocessing.Pool(processes=20, initializer=worker_init, maxtasksperchild=10) as pool:
results = pool.map(compute_for_parameters, params_dilled)
where:
def worker_init():
globals()["dill"] = dill
and:
def compute_for_parameters(params_dilled):
try:
params = dill.loads(params_dilled)
var1, var2, var3 = params
try:
starter = StarterClass(
var1=var1,
var2=var2,
var3=var3,
)
results = starter.get_data()
return results
except Exception as e:
logging.error(f"Error processing parameters {params}: {e}")
raise
finally:
logger.info(f"Clearing cache...")
del pnl, results
gc.collect()
Each instance of StarterClass
loads some data (unfortunately a lot), which probably equates to roughly 500MB in size.
My issue is, that in the beginning, everything is doing fine. It loads, calculates, and goes on to the next. But as the multiprocessing progresses, I can see more and more data is being cached in memory, and eventually it just ends up being all my RAM used for caching, and everything just halts or becomes crazy slow.
How do I circumvent, or at least clear my cache whenever a process is done ?
The:
finally:
logger.info(f"Clearing cache...")
del pnl, results
gc.collect()
doesn’t seem to do much. Also, results doesn’t take up much size, so even though it’s saving that, that shouldn’t be the cause of concern.
Or do I need some kind of other approach for this ?