Considering you have the following code:
import asyncio
import functools
if __name__ == '__main__':
asyncio.run(main())
async def main():
background_t = set()
for i in range(6):
task = asyncio.create_task(foo1(i))
background_t.add(task)
async def foo1(i):
loop = asyncio.get_running_loop()
try:
await asyncio.wait_for(loop.run_in_executor(None, functools.partial(foo2(), i),60)
except asyncio.TimeoutError:
print("timed out")
def foo2(num):
# some blocking code
My question if this pattern causes the code to run concurrently in six different tasks when each of them is synchronous operation.
7
Almost there:
the blocking functions are correctly scheduled to run in different threads in the default executor, and some of those will star running, and execute concurrently.
However, there will be only so many of those tasks running as there are workers in the executor – and each of them will take a full worker for the time it is running. (So, if a task is a continuous loop, with no return, or takes more than 60 seconds, it will simply block a worker).
The OS and Python runtime will switch automatically across all running such tasks (even though due to Python limitations up to 3.12 they will only use a single CPU core at a time, and with a special “free threading” build of 3.13 you can use actual parallelism in various cores for free). The problem is that any task scheduled after all workers are busy won’t start until there is a free worker – it will just be queued for execution.
If the blocking cause is I/O code, just creating an explicit executor with at least as many workers as there are tasks you want to run in parallel will work they way you plan.
A better strategy, if the workers are to be kept running, is to use an explicit thread for each task, with asyncio.to_thread
instead.
And keep in mind that the synchronous functions in other threads have no way to be automatically cancelled: even if the awaitable wrapper call (either run_in_executor
or to_thread
) is cancelled, for example, by timing-out, the actual synchronous code will keep running and using system resources until it finishes naturally.
The only fix for that is to create a state which can be checked inside the sync code (for example, an instance attribute, or a value posted to a Queue (thread-oriented queue from queue.Queue
, not asyncio.Queue
)
As a side note, Python code in a module is executed in order – you call to the main()
function must be at the botton. The if __name__ ...
device is not mandatory – it is a thing that Can be used and allows your module to either be importable into a larger project, or run as an standalone app. If it is not meant to be imported, just call main()
in the last line, at the module level.