I want to run billions of IO-bound tasks across a pool of thousands of threads. These are my challenges:
- Reduce memory usage. The thread pool from
concurrent.futures
uses an unbound queue. Too many submissions and memory usage balloons but too few and not enough work is accomplished. - Collect results. Neither the pool nor its context manager store completed futures, so collecting results isn’t necessary to reduce memory usage. I would like the option of collecting results if at some future time I need them.
- Handle exceptions. I would like python exceptions in any of the tasks to be propagated to the main thread.
- Incorporate asyncio via
loop.run_in_executor()
Each future from concurrent.futures
can have an add_done_callback
that collects the result and handles any exceptions. Exceptions raised within these callbacks are ignored, but since callbacks run from the main thread, exceptions can be stored as a global variable and re-raised from the submission loop. This doesn’t help in reducing memory usage, but I haven’t figured out how to do that even through asyncio (see below).
To incorporate asyncio I create a consumer task and a producer task. The producer submits callables to the loop’s executor in a synchronous loop, placing the resulting futures onto an unbound queue.
The consumer maintains a set of pending futures, awaiting the first result from either that set or the queue. It awaits the result of asyncio.wait()
over both the queue and a second asyncio.wait()
coroutine over the set of pending futures. If the result comes from the queue, it adds it to the set and resumes waiting. If the result is a completed future, it processes the results and handles exceptions before resuming the wait. The task is complete when the queue shuts down.
Am I making this overly convoluted? Is there a simpler approach?
I haven’t figured out a neat way to reduce memory usage. The best I can come up with is to check the thread pool’s _work_queue.qsize()
, and if it exceeds a threshold, to simply await asyncio.sleep(<small number>)
. Surely there must be a better way?