I’m using the pathos
framework to do tasks concurrently, in different processes. Under the hood, this is done with ppft
, which is part of pathos
. My current approach uses a pathos.multiprocessing.ProcessPool
instance. Multiple jobs are submitted to it, one at a time without blocking, using apipe()
. The main process is then supervising the worker processes using ready()
and get()
.
This is working fine, but results of the worker processes are only received after they finish. However, I need a way to get intermediate results. I can’t find anything clear about this in the docs of pathos
/ppft
, but from hints they contain there and here, it seems clear that this should be possible with their features. How do you do inter-process communication with pathos
/ppft
in combination with a ProcessPool
?
The following demo code illustrates my approach. How could I send intermediate results to the main process?. For example, report the list of primes found so far, every time its length is a multiple of 100?
#!/usr/bin/env python3
import pathos
def worker_func(limit):
"""
Dummy task: Find list of all prime numbers smaller than than "limit".
"""
return limit, [
num for num in range(2, limit)
if all(num % i != 0 for i in range(2, num))
]
pool = pathos.pools.ProcessPool()
jobs = []
jobs.append(pool.apipe(worker_func, 10000))
jobs.append(pool.apipe(worker_func, 15000))
jobs.append(pool.apipe(worker_func, 20000))
count_done_jobs = 0
while count_done_jobs < len(jobs):
for job_idx, job in enumerate(jobs):
if job is not None and job.ready():
limit, primes = job.get()
jobs[job_idx] = None
count_done_jobs += 1
print("Job {}: There are {} primes smaller than {}."
.format(job_idx, len(primes), limit))
Thanks & Regards