I’m attempting to set up a persistent queue that will continuously accept and handle tasks, within a GUI app. Based on this example from the docs, I can get a barebones queue proof-of-concept working on its own:
import asyncio
import random
async def worker(name, queue):
print(f"{name} created")
while True:
print(f"{name} awaiting job")
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def add_to_queue(queue):
for _ in range(6):
sleep_for = random.uniform(0.05, 1.0)
print(f"Putting {sleep_for:.2f} on the queue")
queue.put_nowait(sleep_for)
async def create_workers(queue):
tasks = []
for i in range(1, 4):
task = asyncio.create_task(worker(f'Worker #{i}', queue))
tasks.append(task)
async def main():
queue = asyncio.Queue()
asyncio.create_task(create_workers(queue))
await asyncio.sleep(1)
asyncio.create_task(add_to_queue(queue))
await asyncio.sleep(5)
asyncio.run(main())
Output:
Worker #1 created
Worker #1 awaiting job
Worker #2 created
Worker #2 awaiting job
Worker #3 created
Worker #3 awaiting job
Putting 0.83 on the queue
Putting 0.39 on the queue
Putting 0.19 on the queue
Putting 0.41 on the queue
Putting 0.36 on the queue
Putting 0.62 on the queue
Worker #3 has slept for 0.19 seconds
Worker #3 awaiting job
Worker #2 has slept for 0.39 seconds
Worker #2 awaiting job
Worker #3 has slept for 0.41 seconds
Worker #3 awaiting job
Worker #2 has slept for 0.36 seconds
Worker #2 awaiting job
Worker #1 has slept for 0.83 seconds
Worker #1 awaiting job
Worker #3 has slept for 0.62 seconds
Worker #3 awaiting job
(The sleep
in the middle is just a sanity check to make sure it works even when the jobs are added after the workers are already started.)
However, I must be missing something when it comes to sticking this in a Toga app. Here’s the MRE I have so far:
import asyncio
import random
import toga
class TestApp(toga.App):
def startup(self):
self.add_background_task(self.__class__.initialize_queue)
self.main_window = toga.MainWindow()
self.main_window.content = toga.Button(
"Put an item on the queue",
on_press=self.put_on_queue,
)
self.main_window.show()
async def initialize_queue(self):
print("Initializing queue...")
self.queue = asyncio.Queue()
self.workers = []
for i in range(1, 4):
worker = asyncio.create_task(self.worker(f"Worker #{i}"))
self.workers.append(worker)
print(f"Queue intizialized.")
async def worker(self, name):
print(f"{name} created")
while True:
print(f"{name} awaiting job")
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def put_on_queue(self, caller):
sleep_for = random.uniform(0.05, 1.0)
print(f"Putting {sleep_for:.2f} seconds on the queue")
self.queue.put_nowait(sleep_for)
if __name__ == "__main__":
TestApp("Queue Test", "org.example.example").main_loop()
Output (with pressing the button a handful of times):
Initializing queue...
Queue intizialized.
Worker #1 created
Worker #1 awaiting job
Worker #2 created
Worker #2 awaiting job
Worker #3 created
Worker #3 awaiting job
Putting 0.50 seconds on the queue
Putting 0.75 seconds on the queue
Putting 0.16 seconds on the queue
Putting 0.93 seconds on the queue
Putting 0.77 seconds on the queue
Putting 0.75 seconds on the queue
The workers are there waiting, and the jobs are put on the queue, but the former are never receiving the latter.
Admittedly I don’t have the firmest grasp on the different ways of running async code from a synchronous method in Python. Instead of self.add_background_task(self.__class__.initialize_queue)
, I’ve tried asyncio.create_task(self.initialize_queue())
, but it fails because the app’s loop hasn’t been started yet; and of course self.loop.run_until_complete(self.initialize_queue())
creates the workers but then blocks Toga’s event loop from ever starting.
What’s the correct way to make the queue function in this context?