On python 3.11
and Ubuntu
I have a task to init asynchronous calls every time interval (not asyncio
) and inside child do multiprocessing task. I have 36 cores / 72 processors. The problem is when I init new Pool(72)
it takes 0.3 seconds that is too much for my task, because performance matters. With this article Python Process Pool non-daemonic? I found out how to do new pool inside pool (using NoDaemonProcess
). But how to init child pool only once? concurrent.futures
does not good for me, because I made test and it’s slower than multiprocessing
.
Here is working example, I need to modify somehow to init pool inside child only once.
parent pid=907058
2024-06-01 19:16:44.856839 start
2024-06-01 19:16:44.861229 sleep 4 sec
2024-06-01 19:16:44.861777 [907059] on_message(): 1
2024-06-01 19:16:44.866430 [907059] starting pool..
2024-06-01 19:16:44.867275 worker_function(), a=907059_1
2024-06-01 19:16:44.867373 worker_function(), a=907059_2
2024-06-01 19:16:44.867410 worker_function(), a=907059_3
2024-06-01 19:16:48.861738 start
2024-06-01 19:16:48.864965 sleep 4 sec
2024-06-01 19:16:48.865581 [907070] on_message(): 2
2024-06-01 19:16:48.870826 [907070] starting pool..
2024-06-01 19:16:48.871544 worker_function(), a=907070_1
2024-06-01 19:16:48.871638 worker_function(), a=907070_2
2024-06-01 19:16:48.871695 worker_function(), a=907070_3
2024-06-01 19:16:52.865456 long sleep..
2024-06-01 19:16:56.867489 end worker_function(), a=907059_1
2024-06-01 19:16:56.867657 end worker_function(), a=907059_3
2024-06-01 19:16:56.867666 end worker_function(), a=907059_2
2024-06-01 19:16:56.868269 [907059] pool ended
2024-06-01 19:16:56.870487 [907059] finished on_message(): 1
2024-06-01 19:17:00.871746 end worker_function(), a=907070_1
2024-06-01 19:17:00.871896 end worker_function(), a=907070_2
2024-06-01 19:17:00.871903 end worker_function(), a=907070_3
2024-06-01 19:17:00.872659 [907070] pool ended
2024-06-01 19:17:00.874545 [907070] finished on_message(): 2
2024-06-01 19:17:12.865676 finished
Code:
import os
import time
import traceback
from datetime import datetime
from multiprocessing import Pool
import multiprocessing.pool
# /questions/6974695/python-process-pool-non-daemonic
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
class NestablePool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(NestablePool, self).__init__(*args, **kwargs)
class Message():
def __init__(self):
# self.pool_3 = Pool(3)
pass
def worker_function(self, a):
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} worker_function(), a={a}")
time.sleep(12)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} end worker_function(), a={a}")
return None
def on_message(self, message):
try:
pid = os.getpid()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] on_message(): {message}")
# I need to make code that here I don't init new Pool()
# because my server has 72 logic processos and it takes 300ms to init
# for my task it's super long, so I want to init Pool() once, but not everytime when calling on_message()
# this could be possible solution
# but it does not work, in __init__() the Pool(3) is not initing
# res = self.pool_3.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
with Pool(3) as pool:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] starting pool..")
res = pool.starmap_async(self.worker_function, [(f"{pid}_1",),(f"{pid}_2",),(f"{pid}_3",)]).get()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] pool ended")
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} [{pid}] finished on_message(): {message}")
# os.kill(pid, 9)
except Exception as e:
print(traceback.format_exc())
print(e)
if __name__ == "__main__":
print(f"parent pid={os.getpid()}")
# /a/44719580/1802225 process.terminate()
me = Message()
for i in range(1, 3):
print()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} start")
# starting pool non-daemonic to start pool inside
pool = NestablePool(1)
pool.starmap_async(me.on_message, [(i,)])
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} sleep 4 sec")
time.sleep(4)
print()
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} long sleep..")
time.sleep(20)
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')} finished")