I have a large number of asyncio tasks that are consuming data via a queue and writing to to separate files. However, some of the files will be written to multiple times via mode a+
. I have written some code to simulate some random processing in a similar way to my real world example.
I am using asyncio.Lock()
in the following fashion to protect the file from whatever task takes ownership of writing to it, but am still receiving CSV results that are misaligned and/or corrupted. Also, the header seems to be getting written multiple times even though the size of the file shouldn’t be 0 after the header is first written.
What am I missing?
import asyncio
import aiofiles
import aiofiles.os
import aiocsv
import uuid
import random
import json
from pathlib import Path
from datetime import datetime, timezone
async def write_csv(item: list, load_id: str, prefix: str) -> None:
Path("./test_files").mkdir(parents=True, exist_ok=True)
file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")
# Asynchronously write to our file
async with aiofiles.open(file_path, mode="a+", newline="") as f:
print(f"INFO: writing file: {Path(file_path).resolve()}")
w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
print(f"file size: {await aiofiles.os.path.getsize(file_path)}")
# If the file is empty, write the header
if await aiofiles.os.path.getsize(file_path) == 0:
print("file was empty! writing header")
# Write the header
async with asyncio.Lock():
await w.writerow([
"response",
"load_id",
"last_updated_timestamp_utc"
])
# do something special for specific file name
# I am just trying to simulate more random data processing
if prefix == "file_one":
# Shuffle the chunks again
item = random.shuffle(item)
# Write the data
for chunk in item:
async with asyncio.Lock():
await w.writerow([
json.dumps(chunk),
load_id,
datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
])
async def main() -> None:
# Create fake data
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500
# Possible file prefixes
prefixes: list[str] = ["file_one", "file_two"]
tasks: list = []
load_id = str(uuid.uuid4())
for i in items:
# Randomly assign which file we will write to
task = asyncio.create_task(write_csv(i, load_id, random.choice(prefixes)))
tasks.append(task)
errors = await asyncio.gather(*tasks, return_exceptions=True)
# print(errors)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())