Question summary: I am using pyarrow.dataset
scanner
and write_dataset
. Knowing the average size in memory of a record read from a parquet file, how do I choose the values of batch_readahead
, fragment_readahead
, batch_size
and max_open_files
so as to maximize available system resources without hitting OOM?
I have a number of parquet files with very small row-groups. I am using pyarrow.dataset
to scan the files and then write them out using larger row-groups. The basic logic is pretty straightforward:
dataset = ds.dataset(INPATH, format='parquet')
scanner = dataset.scanner(
batch_size=batch_size,
batch_readahead=batch_readahead,
fragment_readahead=fragment_readahead
)
ds.write_dataset(
dataset,
base_dir=OUTPATH,
format='parquet',
max_open_files=max_open_files,
max_rows_per_file=maxrecords,
min_rows_per_group=batch_size,
max_rows_per_group=batch_size,
)
I want to optimize the values of batch_readahead
, fragment_readahead
, batch_size
and max_open_files
so as to maximize available CPU/RAM.
I have a function that samples my input files and determines the average size in memory of a record, call this avg_rec_size
. (Sampling because parquet file metadata only seems to show serialized sizes, which are much smaller than in-memory sizes).
I find that when I keep batch_readahead
, fragment_readahead
and max_open_files
to 1
that memory usage seems predictable as some function of batch_size
and avg_rec_size
. But memory consumption is much lower than what’s available, so I want to increase batch_readahead
, fragment_readahead
and max_open_files
so as to speed up the process and make full use of available resources.
However, I’m unable to understand the relationships between all these variables.
- is the number of scanner batches and files being read a multiple of
max_open_files
? In other words, ifbatch_readahead
andfragment_readahead
are both1
, is the scanner reading 1 batch for each file being written bywrite_dataset
, or 1 in total irrespective ofmax_open_files
? - if
batch_readahead
andfragment_readahead
are both equal to2
, does that mean there are four batches being read in parallel (2 batches x 2 file fragments)? Or just two batches being read in parallel (one batch per file)?