Many times I encountered the situation of loading many files to Python using external library. For example I had the case of loading many ‘.npy’ files with np.load.
So presuming that the external loading function (i.e np.load) is optimized, is it generally possible to improve loading efficiency by avoiding sequential loading ?
To see this I am testing three options of multithreading, multiprocessing and asynchronous loading for a minimal example of loading 1000 ‘.npy’ files each containing a numpy array with dimensions (1000,1000) giving 10 Gb in total.
The minimal example is
from time import perf_counter
from joblib import Parallel, delayed
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
n = 1000 # array dimension
k = 2 # sample number for statistics
np.save("data/file{}.npy".format(i), a)
startTime = perf_counter()
r = mainThread() # mainSeq(), asyncio.run(mainAsync())
readTime = endTime - startTime
readTimes.append(readTime)
readTimes = np.array(readTimes)
print("mean", np.mean(readTimes), "seconds")
print("error", np.std(readTimes), "seconds")
<code>import numpy as np
from time import perf_counter
from joblib import Parallel, delayed
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import asyncio
import aiofiles
m = 1200 # filenumber
n = 1000 # array dimension
k = 2 # sample number for statistics
# create data
for i in range(m):
a = np.random.rand(n, n)
np.save("data/file{}.npy".format(i), a)
# read data
readTimes = []
for i in range(k):
startTime = perf_counter()
r = mainThread() # mainSeq(), asyncio.run(mainAsync())
endTime = perf_counter()
readTime = endTime - startTime
readTimes.append(readTime)
readTimes = np.array(readTimes)
print("mean", np.mean(readTimes), "seconds")
print("error", np.std(readTimes), "seconds")
</code>
import numpy as np
from time import perf_counter
from joblib import Parallel, delayed
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import asyncio
import aiofiles
m = 1200 # filenumber
n = 1000 # array dimension
k = 2 # sample number for statistics
# create data
for i in range(m):
a = np.random.rand(n, n)
np.save("data/file{}.npy".format(i), a)
# read data
readTimes = []
for i in range(k):
startTime = perf_counter()
r = mainThread() # mainSeq(), asyncio.run(mainAsync())
endTime = perf_counter()
readTime = endTime - startTime
readTimes.append(readTime)
readTimes = np.array(readTimes)
print("mean", np.mean(readTimes), "seconds")
print("error", np.std(readTimes), "seconds")
where mainThread()
, mainAsync()
and mainSeq()
are
<code># sequential loading
return [np.load("data/file{}.npy".format(i)) for i in range(m)]
return Parallel(backend="threading", n_jobs=10)
(delayed(load)("data/file{}.npy".format(i)) for i in range(m))
async def asyncLoad(filepath):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, np.load, filepath)
paths = ["data/file{}.npy".format(i) for i in range(m)]
tasks = [asyncLoad(filepath) for filepath in paths]
return await asyncio.gather(*tasks)
<code># sequential loading
def MainSeq():
return [np.load("data/file{}.npy".format(i)) for i in range(m)]
# multithreaded loading
def MainThread():
return Parallel(backend="threading", n_jobs=10)
(delayed(load)("data/file{}.npy".format(i)) for i in range(m))
# async loading
async def asyncLoad(filepath):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, np.load, filepath)
async def mainAsync():
paths = ["data/file{}.npy".format(i) for i in range(m)]
tasks = [asyncLoad(filepath) for filepath in paths]
return await asyncio.gather(*tasks)
</code>
# sequential loading
def MainSeq():
return [np.load("data/file{}.npy".format(i)) for i in range(m)]
# multithreaded loading
def MainThread():
return Parallel(backend="threading", n_jobs=10)
(delayed(load)("data/file{}.npy".format(i)) for i in range(m))
# async loading
async def asyncLoad(filepath):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, np.load, filepath)
async def mainAsync():
paths = ["data/file{}.npy".format(i) for i in range(m)]
tasks = [asyncLoad(filepath) for filepath in paths]
return await asyncio.gather(*tasks)
With this example I could get a factor of 2 of improvement in computation time. Is this the right way of using asyncio for improving loading files ? Can we expect better improvement ?