I’m trying to write a method that kills all the processes spawned by a ProcessPoolExecutor
, without waiting the processes to terminate their jobs (as executor.shutdown(wait=False)
would do).
This is my code so far (I use devtools.debug
, but you can use print
)
import os
import signal
import time
from concurrent.futures import ProcessPoolExecutor
from devtools import debug
def task(x):
time.sleep(5)
return 42
class MyPoolExecutor(ProcessPoolExecutor):
def terminate_processes(self):
debug("Initial processes", self._processes)
for pid in self._processes.keys():
os.kill(pid, signal.SIGTERM)
debug("Right after os.kill", self._processes)
time.sleep(1)
debug("After time.sleep(1)", self._processes)
self.shutdown(wait=False)
debug("After shutdown", self._processes)
return
if __name__ == "__main__":
# SUBMIT
with MyPoolExecutor() as executor:
futures = [executor.submit(task, x) for x in range(5)]
debug("Before terminate_process", futures)
executor.terminate_processes()
debug("After terminate_process", futures)
debug(futures)
# MAP
with MyPoolExecutor() as executor:
futures = executor.map(task, [x for x in range(5)])
# debug("Before terminate_process", futures) # ! BLOCKING
executor.terminate_processes()
debug("After terminate_process", futures)
debug(futures)
When running the script, this is the output:
SUBMIT
thread.py:33 <module>
'Before terminate_process' (str) len=24
futures: [
<Future at 0x10d74a380 state=running>,
<Future at 0x10d749480 state=running>,
<Future at 0x10d749330 state=running>,
<Future at 0x10d7492a0 state=running>,
<Future at 0x10d7490f0 state=running>,
] (list) len=5
thread.py:14 MyPoolExecutor.terminate_processes
'Initial processes' (str) len=17
self._processes: {
62408: <SpawnProcess name='SpawnProcess-1' pid=62408 parent=62404 started>,
62409: <SpawnProcess name='SpawnProcess-2' pid=62409 parent=62404 started>,
62410: <SpawnProcess name='SpawnProcess-3' pid=62410 parent=62404 started>,
62411: <SpawnProcess name='SpawnProcess-4' pid=62411 parent=62404 started>,
62412: <SpawnProcess name='SpawnProcess-5' pid=62412 parent=62404 started>,
} (dict) len=5
thread.py:18 MyPoolExecutor.terminate_processes
'Right after os.kill' (str) len=19
self._processes: {
62408: <SpawnProcess name='SpawnProcess-1' pid=62408 parent=62404 started>,
62409: <SpawnProcess name='SpawnProcess-2' pid=62409 parent=62404 started>,
62410: <SpawnProcess name='SpawnProcess-3' pid=62410 parent=62404 started>,
62411: <SpawnProcess name='SpawnProcess-4' pid=62411 parent=62404 started>,
62412: <SpawnProcess name='SpawnProcess-5' pid=62412 parent=62404 started>,
} (dict) len=5
thread.py:21 MyPoolExecutor.terminate_processes
'After time.sleep(1)' (str) len=19
self._processes: {
62408: <SpawnProcess name='SpawnProcess-1' pid=62408 parent=62404 stopped exitcode=-SIGTERM>,
62409: <SpawnProcess name='SpawnProcess-2' pid=62409 parent=62404 stopped exitcode=-SIGTERM>,
62410: <SpawnProcess name='SpawnProcess-3' pid=62410 parent=62404 stopped exitcode=-SIGTERM>,
62411: <SpawnProcess name='SpawnProcess-4' pid=62411 parent=62404 stopped exitcode=-SIGTERM>,
62412: <SpawnProcess name='SpawnProcess-5' pid=62412 parent=62404 stopped exitcode=-SIGTERM>,
} (dict) len=5
thread.py:24 MyPoolExecutor.terminate_processes
'After shutdown' (str) len=14
self._processes: None (NoneType)
thread.py:35 <module>
'After terminate_process' (str) len=23
futures: [
<Future at 0x10d74a380 state=finished raised BrokenProcessPool>,
<Future at 0x10d749480 state=finished raised BrokenProcessPool>,
<Future at 0x10d749330 state=finished raised BrokenProcessPool>,
<Future at 0x10d7492a0 state=finished raised BrokenProcessPool>,
<Future at 0x10d7490f0 state=finished raised BrokenProcessPool>,
] (list) len=5
thread.py:36 <module>
futures: [
<Future at 0x10d74a380 state=finished raised BrokenProcessPool>,
<Future at 0x10d749480 state=finished raised BrokenProcessPool>,
<Future at 0x10d749330 state=finished raised BrokenProcessPool>,
<Future at 0x10d7492a0 state=finished raised BrokenProcessPool>,
<Future at 0x10d7490f0 state=finished raised BrokenProcessPool>,
] (list) len=5
MAP
thread.py:14 MyPoolExecutor.terminate_processes
'Initial processes' (str) len=17
self._processes: {
62420: <SpawnProcess name='SpawnProcess-6' pid=62420 parent=62404 started>,
62421: <SpawnProcess name='SpawnProcess-7' pid=62421 parent=62404 started>,
62422: <SpawnProcess name='SpawnProcess-8' pid=62422 parent=62404 started>,
62423: <SpawnProcess name='SpawnProcess-9' pid=62423 parent=62404 started>,
62424: <SpawnProcess name='SpawnProcess-10' pid=62424 parent=62404 started>,
} (dict) len=5
thread.py:18 MyPoolExecutor.terminate_processes
'Right after os.kill' (str) len=19
self._processes: {
62420: <SpawnProcess name='SpawnProcess-6' pid=62420 parent=62404 stopped exitcode=-SIGTERM>,
62421: <SpawnProcess name='SpawnProcess-7' pid=62421 parent=62404 stopped exitcode=-SIGTERM>,
62422: <SpawnProcess name='SpawnProcess-8' pid=62422 parent=62404 stopped exitcode=-SIGTERM>,
62423: <SpawnProcess name='SpawnProcess-9' pid=62423 parent=62404 stopped exitcode=-SIGTERM>,
62424: <SpawnProcess name='SpawnProcess-10' pid=62424 parent=62404 stopped exitcode=-SIGTERM>,
} (dict) len=5
thread.py:21 MyPoolExecutor.terminate_processes
'After time.sleep(1)' (str) len=19
self._processes: {
62420: <SpawnProcess name='SpawnProcess-6' pid=62420 parent=62404 stopped exitcode=-SIGTERM>,
62421: <SpawnProcess name='SpawnProcess-7' pid=62421 parent=62404 stopped exitcode=-SIGTERM>,
62422: <SpawnProcess name='SpawnProcess-8' pid=62422 parent=62404 stopped exitcode=-SIGTERM>,
62423: <SpawnProcess name='SpawnProcess-9' pid=62423 parent=62404 stopped exitcode=-SIGTERM>,
62424: <SpawnProcess name='SpawnProcess-10' pid=62424 parent=62404 stopped exitcode=-SIGTERM>,
} (dict) len=5
thread.py:24 MyPoolExecutor.terminate_processes
'After shutdown' (str) len=14
self._processes: None (NoneType)
thread.py:44 <module>
'After terminate_process' (str) len=23
futures: <generator object _chain_from_iterable_of_lists at 0x10d735cb0> (generator)
!!! error pretty printing value: BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')
thread.py:45 <module>
futures: (
) (generator)
I have two questions:
- is this the right way to kill processes?
- what is going on with
Future
s in themap
case? Why is theBrokenProcessPool
registred inFuture.state
forsubmit
‘sFuture
s, while it’s raised bymap
‘sFuture
s?
Thank you