I have a large text fie (100gb) which I need to process. The file, after the first 109 rows, has a repeating pattern: a timestamp, 16 rows of data, a blank line. The timestamp refers to the next 16 rows of data, which have 10000 values, tab-separated, with comma as decimal character. Each of these rows refers to a different acquisition channel.
The purpose of my script is to convert that data to parquet, with each row having the proper timestamp and channel number, without loading the entire dataset to the memory.
Due to the fact that PyArrow doesn’t allow the timestamp rows to be acquired, I used mmap to memory map the file, count the total lines and return only the ones with matching timestamps. I created 2 arrays, one for channel and the other for timestamp, with the length of the data rows, and used open_csv to stream the data into a parquet file. Is there any optimization that could be applied to improve processing speed?
My code looks as follows:
def count_lines_mmap(filepath):
with open(filepath, 'r+b') as f:
mm = mmap.mmap(f.fileno(), 0)
lines = 0
matching_lines = []
while line := mm.readline():
lines += 1
if re.match(REGEX_TIMESTAMP, line.decode()):
matching_lines.append(line.decode())
mm.close()
return lines, matching_lines
def process_file(input_file, output_file):
# Extract wavelength information
wavelength_start, wavelength_delta, num_points = extract_wavelength_info(input_file)
lines, matching_lines = count_lines_mmap(FILEIN)
if((lines - SKIP_ROWS) / 18 != len(matching_lines)):
raise ValueError("Wrong number of timestamps found in the file.")
else:
print('Matching number of lines.')
lines_array = pa.array(pd.to_datetime(
np.repeat(matching_lines, 16),
format="%d.%m.%Y %H.%M.%S.%fn")
)
channels_array = pa.array(np.tile(CHANNEL_RANGE, n_groups))
# Generate wavelength column names
wavelengths = np.arange(wavelength_start,
wavelength_start + num_points * wavelength_delta,
wavelength_delta)
wl_cols = [f"{wl:.3f}" for wl in wavelengths]
def handle_invalid_row(invalid_row):
return "skip"
schema = pa.schema([
pa.field(i, pa.float32()) for i in wl_cols
])
writer_schema = pa.schema([
pa.field(i, pa.float32()) for i in wl_cols
] + [
pa.field("Channel", pa.uint8()),
pa.field("Timestamp", pa.timestamp('ns'))
])
# Define the CSV read options
read_options = csv.ReadOptions(
column_names=wl_cols,
skip_rows=SKIP_ROWS,
block_size = 1024 * 1024 * 64 # 64MB chunks
)
parse_options = csv.ParseOptions(
delimiter="t",
invalid_row_handler=handle_invalid_row
)
convert_options = csv.ConvertOptions(
column_types=schema,
decimal_point=','
)
writer = None
start_index = 0
end_index = 0
with csv.open_csv(
input_file,
read_options=read_options,
parse_options=parse_options,
convert_options=convert_options,
) as reader:
for next_chunk in reader:
if next_chunk is None:
break
end_index += len(next_chunk)
if writer is None:
writer = pq.ParquetWriter(
output_file,
schema=writer_schema,
#next_chunk.schema,
)
next_table = pa.Table.from_batches([next_chunk])
# Add channel and timestamp columns
next_table = next_table.append_column("Channel", channels_array[start_index:end_index])
next_table = next_table.append_column("Timestamp", lines_array[start_index:end_index])
writer.write_table(next_table)
start_index = end_index
writer.close()
It would be better to simply let the CSV parser take the whole file and then for every batch from the CSV, apply the filter condition you’re applying directly to the input now. You can apply that using pyarrow.compute
and make sure you concatenate the resulting batches until you have a bigger batch to give to the Parquet writer.
An easy win for the current code is probably concatenating the batches you get from the CSV reader before you hand them out to the Parquet writer. That depends on the shape of your data, so you will have to benchmark and find the optimal sizes.
1