I have some code that uses asyncio to map over a a large dataset by spawning a few hundred workers. Is there a way to use multiple cores, to make it even faster?
from tqdm import tqdm
import asyncio
import aiohttp
from pathlib import Path
from collections import Counter
WORKER_COUNT = 500 # Number of workers
async def fetch_and_save(input_item):
# do work
print("working...")
return result_code
async def worker(task_queue, session, results, pbar):
while not task_queue.empty():
item_id = await task_queue.get()
result = await fetch_and_save(item_id, session)
results[result] += 1
task_queue.task_done()
async def main():
async with aiohttp.ClientSession() as session:
task_queue = asyncio.Queue()
results = Counter()
# Queue all items
ids = read_items_from_disk()
import random
random.seed(42)
random.shuffle(ids)
for id in ids:
await task_queue.put(id)
pbar = tqdm(total=task_queue.qsize())
# Create worker coroutines
workers = [asyncio.create_task(worker(task_queue, session, results, pbar)) for _ in range(WORKER_COUNT)]
# Wait for all tasks in the queue to be processed
await task_queue.join()
# Cancel all remaining workers after queue is empty
for worker_task in workers:
worker_task.cancel()
asyncio.run(main())