I have a manager script that’s launching some processes, then using two coroutines (one to monitor, one to gather results). For some reason only one coroutine seems to be run, what am I missing? (I don’t work with asyncio)
import multiprocessing as mp
import time
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
class Process(mp.Process):
def __init__(self, task_queue: mp.Queue, result_queue: mp.Queue):
super().__init__()
self.task_queue = task_queue
self.result_queue = result_queue
logging.info('Process init')
def run(self):
while not self.task_queue.empty():
try:
task = self.task_queue.get(timeout=1)
except mp.Queue.Empty:
logging.info('Task queue is empty')
break
time.sleep(1)
logging.info('Processing task %i (pid %i)', task, self.pid)
self.result_queue.put(task)
logging.info('Process run')
class Manager:
def __init__(self):
self.processes = []
self.task_queue = mp.Queue()
self.result_queue = mp.Queue()
self.keep_running = True
async def monitor(self):
while self.keep_running:
await asyncio.sleep(0.1)
logging.info('Task queue size: %i', self.task_queue.qsize())
logging.info('Result queue size: %i', self.result_queue.qsize())
self.keep_running = any([p.is_alive() for p in self.processes])
async def consume_results(self):
while self.keep_running:
try:
result = self.result_queue.get()
except mp.Queue.Empty:
logging.info('Result queue is empty')
continue
logging.info('Got result: %s', result)
def start(self):
# Populate the task queue
for i in range(10):
self.task_queue.put(i)
# Start the processes
for i in range(3):
p = Process(self.task_queue, self.result_queue)
p.start()
self.processes.append(p)
# Wait for the processes to finish
loop = asyncio.get_event_loop()
loop.create_task(self.monitor())
loop.create_task(self.consume_results())
manager = Manager()
manager.start()
- expecting to see the monitor queue sizes, however only the
consume_results()
is run