I am trying to build a batched queue processor with asyncio
. The way it should work is that one should be able to push individual requests into the queue and that the queue processor would batch these up based on some heuristic, process the batch and that it would then release the results back to the calling tasks. That each calling task should be blocked/awaited until it’s result is available.
Here is what I got to so far:
import asyncio
from typing import TypedDict, List, Any, Dict
class Event(TypedDict):
id: int
payload: Any
count_tokens = lambda content: len(str(content))
class BatchProcessor:
queue: asyncio.Queue[Event] = asyncio.Queue()
lock = asyncio.Lock()
batch_processed = asyncio.Event()
results: Dict[int, Any] = {}
counter: int = 0
def __init__(self, tokens_per_batch = 100) -> None:
self.queue = asyncio.Queue()
self.tokens_per_batch = tokens_per_batch
self.processor_task = asyncio.create_task(self.processor())
async def next_counter(self):
async with self.lock:
self.counter += 1
return self.counter
async def processor(self) -> None:
batch: List[Event] = []
tokens = 0
while True:
event = await self.queue.get()
batch.append(event)
tokens += count_tokens(event)
print(f"Tokens: {tokens}")
if tokens > self.tokens_per_batch:
print(f"Triggering batch of {len(batch)} items")
self.results.update(await self.process_batch(batch))
print(f"Notifying batch processed event for {self.results} items")
self.batch_processed.set()
batch = []
tokens = 0
async def process_batch(self, batch: List[Any]) -> Dict[int, Any]:
print(f"Processing batch of {len(batch)} items")
await asyncio.sleep(1)
return {event.id: "{event.payload} processed" for event in batch}
async def add_to_batch(self, payload: Any ) -> None:
id = await self.next_counter()
self.queue.put_nowait(Event(id=id, payload=payload))
# wait for event
print(f"Added request {id} to batch")
while True:
await self.batch_processed.wait()
print(f"Checking for result of request {id}")
if id in self.results:
result = self.results.pop(id)
print(f"Result for request {id}: {result}")
return result
async def main():
batch_processor = BatchProcessor(tokens_per_batch=100)
# Simulating client requests
coros = [batch_processor.add_to_batch(f"data_{i}") for i in range(1, 11)]
# Wait for all requests to be processed
results = await asyncio.gather(*coros)
print("Results:", results)
# Run the main function
asyncio.run(main())
Producing this output (and then getting stuck):
Added request 1 to batch
Added request 2 to batch
Added request 3 to batch
Added request 4 to batch
Added request 5 to batch
Added request 6 to batch
Added request 7 to batch
Added request 8 to batch
Added request 9 to batch
Added request 10 to batch
Tokens: 30
Tokens: 60
Tokens: 90
Tokens: 120
Triggering batch of 4 items
Processing batch of 4 items
Although I have worked with asyncio
for a little bit, I am realizing that my understanding is severely lacking. For instance, it appears that the process_batch
never returns. Also, I am unsure where and when the batch_processed
event should be cleared again.
Next, I was planning on adding a flush
method that would force the remaining events in the batch to get processed, but first am need to get the basics working.
I would be most grateful for any hints.