I want to read a parquet file batch by batch in parallelism. I achieve this by merge multiple continuous rowgroup together and read them by arrow::RecordBatchReader. When I monitor the memory usage during reading, I noticed that the memory was increasing until overall reading done. However, I want to reduce the memory footprint, releasing the memory as soon as current thread finish current batch reading.
However, I tried arrow::RecordBatchReader->Close(), it didn’t work.
And here is my code.
template <class FUNCTION, typename T,
typename R = typename arrow::internal::call_traits::return_type<FUNCTION>::ValueType>
arrow::Future<std::vector<R>> ParallelForAsync_test(
std::vector<T> inputs, FUNCTION&& func,
arrow::internal::Executor* executor = arrow::internal::GetCpuThreadPool()) {
std::vector<arrow::Future<R>> futures(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
}
return All(std::move(futures))
.Then([](const std::vector<arrow::Result<R>>& results) -> arrow::Result<std::vector<R>> {
return arrow::internal::UnwrapOrRaise(results);
});
}
arrow::Status read_whole_file(std::string file, int batch_size, int &size) {
::arrow::MemoryPool* pool = ::arrow::default_memory_pool();
auto reader_properties = parquet::ReaderProperties(pool);
reader_properties.set_buffer_size(4096 * 4);
reader_properties.enable_buffered_stream();
auto arrow_reader_props = parquet::ArrowReaderProperties();
arrow_reader_props.set_batch_size(4 * 1024);
arrow_reader_props.set_use_threads(true);
parquet::arrow::FileReaderBuilder reader_builder;
ARROW_RETURN_NOT_OK(reader_builder.OpenFile(file, false, reader_properties));
reader_builder.memory_pool(pool);
reader_builder.properties(arrow_reader_props);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build());
auto p_reader = arrow_reader->parquet_reader();
int nrgs = p_reader->metadata()->num_row_groups();
int nrows = p_reader->metadata()->num_rows();
int ncolumns = p_reader->metadata()->num_columns();
auto cpu_executor = ::arrow::internal::GetCpuThreadPool();
int rg_batchsize = nrgs / batch_size;
std::vector<std::shared_ptr<arrow::RecordBatchReader>> vec_reader;
for (int j = 0; j <= rg_batchsize; j++) {
std::shared_ptr<arrow::RecordBatchReader> rb_reader;
arrow_reader->GetRecordBatchReader(
arrow::internal::Iota(j * batch_size, std::min((j+1) * batch_size, nrgs)), &rb_reader);
vec_reader.emplace_back(rb_reader);
}
size = vec_reader.size();
std::cout << size << std::endl;
auto thread_start = std::chrono::high_resolution_clock::now();
auto read_recordbatch = [ncolumns, thread_start](size_t i, std::shared_ptr<::arrow::RecordBatchReader> reader)
// -> ::arrow::Result<std::shared_ptr<::arrow::Array>> {
-> ::arrow::Result<bool>{
auto io_start = std::chrono::high_resolution_clock::now();
auto result = reader->ToTable();
std::vector<std::shared_ptr<::arrow::Array>> vec_array;
if (result.ok()) {
auto table = *result;
for (int i = 0; i < ncolumns; i++) {
auto result = ChunkedArrayToArray(table->column(i));
if (result.ok()) {
auto array = *result;
vec_array.emplace_back(array);
}
}
}
auto io_end = std::chrono::high_resolution_clock::now();
reader->Close();
std::cout << "thread " << i << " " << vec_array[0]->length() << " rows "
<< "start_overhead " << std::chrono::duration<double, std::milli>(io_start-thread_start).count() << " ms "
<< "io_overhead " << std::chrono::duration<double, std::milli>(io_end-io_start).count() << " ms" << std::endl;
// return vec_array[0]->length();
// return vec_array[0];
return true;
};
auto re = ParallelForAsync_test(std::move(vec_reader), read_recordbatch, cpu_executor)
.MoveResult();
auto re_chka = re.ValueOrDie();
return ::arrow::Status::OK();
}