I have an 3rd party cli executable I need to call from my python code. These are heavy calculations (cpu) and there are around 50-100 times I need to call it. The executable itself is to some degree multi-threaded but not all steps and I have a lot of cores available.
This means I want to have multiple subprocesses running at once but not all of them. So I need to submit some of them and then track once one of them completes, start a new one to optimize cpu usage.
I had a working version but it was very naive, but it worked. It just waited for the first submitted process to complete. But it is data-dependent, so at some point a very long running process was the “first” one submitted of the remaining ones and it will just block from any other process getting submitted until this one completes. There is also a long sequential/IO? phase and this long-running one can be at 1% cpu for hrs during which the cpu is then mostly idle.
So I needed to optimize my process submission code:
num_concurrent_processes = 6
for path in files:
# Building cmd omitted
rs_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
bufsize=1, creationflags=0x00000008)
rs_processes.append(rs_process)
if len(rs_processes) >= num_concurrent_processes:
# wait until at least one process has completed before submitting the next one
continue_polling = True
while continue_polling:
for idx, proc in enumerate(rs_processes):
poll = proc.poll()
if poll is not None:
# process complete
stdout_data, stderr_data = proc.communicate()
# removed some logging in case of error
#remove completed process from list
del rs_processes[idx]
# exit polling loop as a new subprocess can be submitted
continue_polling = False
break
if continue_polling:
# Put some breaks on the polling
time.sleep(10)
I’m blocking submission of more subprocesses if the limit is reached. Then I poll until I find a process that finished (poll is not None according to python documentation). I get the output from the process via communicate and do some action on it (logging, not shown). Then I remove the process from the “tracking list” and exit from the polling loop.
But there is some kind of logic flaw in the code as when running it “stale” processes accumulate. They all sit at 0% but somehow the polling doesn’t think they completed. And once there are num_concurrent_processes
stale processes, progress stops entirely. What is going wrong here? Does using poll() mean I now need to manually terminate the process? With the old method all processes ran just fine and stopped on their own and I’m using the same data here with the new method.
1