The issue may be related to https://github.com/pola-rs/polars/issues/9842 and How to process Python Polars LazyFrame in batches
My setup is
input = pathlib.Path("input.csv") # 300k lines
output = pathlib.Path("output.csv")
def mapper(row_id):
# expensive computation and can fail sometimes
pass
any_value_column_is_null = ... # polars expression
schema_as_dict = ... # polars schema
id_col_name = "id"
def process_unprocessed_rows_in_batch(df: pl.DataFrame) -> pl.DataFrame:
additional_data = (
df.filter(any_value_column_is_null)
.with_columns(
pl.col(id_col_name)
.map_elements(
mapper,
pl.Struct(schema_as_dict),
)
.alias(generated_data_col_name)
)
.with_columns(pl.col(generated_data_col_name).struct.unnest())
.drop(generated_data_col_name)
)
return df.update(additional_data, on=id_col_name, how="left")
df = pl.scan_csv(input, schema=schema_as_dict).map_batches(
process_unprocessed_rows_in_batch, streamable=True
)
df.sink_csv(output, maintain_order=False)
plan is
STREAMING:
OPAQUE_PYTHON
Csv SCAN [snippet-dataset.csv]
PROJECT */4 COLUMNS
In this setup I want to have best effort processing – if mapper fails, I want to have already processed results persisted
I assumed that streaming is executed in batches, so already processed batches are persisted in output and in case of fail I’m going to lose only current batch
But it seems not the case – if processing is failed intermedate output is empty
I tried adjusting sink_csv(batch_size)
and pl.Config.set_streaming_chunk_size
– it has no effect