I am wondering how to make multiprocessing in python (3.11
) with asynchronous calls (not asyncio
lib) and automatically close processes when they are finished?
Below I wrote a simple code and the problem that it does not close pool processes (# this prints is never executed
line are not reached), perhaps there are errors, but I don’t see in terminal output (testing on Ubuntu).
Task: I need to execute processes every timer interval and without waiting to finish start new ones. I have not found any information on Internet and decided to make pool_list
global variable to get access to pool objects (to close them later when process is done).
How to close pools correct in my problem? Maybe there is another solution? I chose concurrent.futures
lib, because later I will need to start new pool process inside process (in func on_message
), I have read that concurrent.futures
can do that (source: Python multiprocessing: is it possible to have a pool inside of a pool?).
2024-06-01 02:53:23.984849 start
2024-06-01 02:53:23.987660 sleep 5 sec
2024-06-01 02:53:23.988430 new message: message-1, pool_list len=1
2024-06-01 02:53:28.988004 start
2024-06-01 02:53:28.991552 sleep 5 sec
2024-06-01 02:53:28.992286 new message: message-2, pool_list len=2
2024-06-01 02:53:33.991890 start
2024-06-01 02:53:33.994907 sleep 5 sec
2024-06-01 02:53:33.995735 new message: message-3, pool_list len=3
2024-06-01 02:53:38.988640 finished message: message-1
2024-06-01 02:53:38.995415 start
2024-06-01 02:53:38.998443 sleep 5 sec
2024-06-01 02:53:38.999227 new message: message-4, pool_list len=4
2024-06-01 02:53:43.992491 finished message: message-2
2024-06-01 02:53:43.999209 start
2024-06-01 02:53:44.002906 sleep 5 sec
2024-06-01 02:53:44.003642 new message: message-5, pool_list len=5
2024-06-01 02:53:48.995955 finished message: message-3
2024-06-01 02:53:49.003247 start
2024-06-01 02:53:49.006613 sleep 5 sec
2024-06-01 02:53:49.007651 new message: message-6, pool_list len=6
2024-06-01 02:53:53.999423 finished message: message-4
2024-06-01 02:53:59.003858 finished message: message-5
2024-06-01 02:54:04.007971 finished message: message-6
import time
import random
import string
from datetime import datetime
# from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor as Pool
pool_list = {}
def on_message(pool_id, message):
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} new message: {message}, pool_list len={len(pool_list)}")
time.sleep(15)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} finished message: {message}")
# this prints is never executed
print('done:', pool_list[pool_id].done())
print('running:', pool_list[pool_id].running())
# close pool as it finished
pool_list[pool_id].cancel()
del pool_list[pool_id]
print('closed!', f"pool_list len={len(pool_list)}")
if __name__ == "__main__":
for i in range(1, 7):
print()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} start")
pool_id = ''.join(random.choices(string.ascii_letters, k=10))
# we can not use "with" context, otherwise it will delay for loop until on_message() finished, but we need async
pool = Pool(1)
# adding pool to list in order to close() it in on_message()
pool_list[pool_id] = pool
pool.submit(on_message, pool_id, f'message-{i}')
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} sleep 5 sec")
time.sleep(5)