I benchmark the imputation of null values with zeroes in a large dataframe saved as parquet file. The steps of the test are:
- Read a ~800Mb parquet file which stores a dataframe with 10000×10000 dimension.
- Replace any null values with zeroes
- Save the filled dataframe into a parquet file
I am doing the same test in Python/Pandas and Rust/Polars. I was hoping that Polars in Rust will be much faster than Pandas in Python but the total load/process/store duration in Rust is ~20 seconds while in Pandas takes only ~16 seconds. Parallelization is not used by either solution, as I can see but still I was expecting that Rust should be much faster in this case. Anything that I can do to maximize the processing speed in Rust?
The code:
<code>Some(Commands::Impute { input_path, output_path }) => {
// Start measuring time
let start = Instant::now();
let df = polars2::read_parquet(&input_path);
let mut imputed_df = polars2::fill_nan_with_zeroes(&df).expect("Error");
polars2::write_parquet(&mut imputed_df, &output_path);
// End measuring time
let duration = start.elapsed();
println!("Total duration: {:.2} seconds", duration.as_secs_f64());
}
// utilities for working with polars dataframes
//
use polars::prelude::*;
use std::fs::File;
//use polars::df;
//read in a parquet file
pub fn read_parquet(path: &str) -> DataFrame {
// Open file
let file = File::open(path).expect("Failed to open file");
// Read to DataFrame and Return
ParquetReader::new(file)
.finish()
.expect("Failed to read Parquet file")
}
// write dataframe to parquet file
pub fn write_parquet(df: &mut DataFrame, path: &str) {
// create a file
let file = File::create(path)
.expect("could not create output file");
// write dataframe to output parquet
ParquetWriter::new(file)
.finish(df)
.expect("Failed to write dataframe.");
}
//print "n" rows of a dataframe
pub fn print_df(df: &DataFrame, n: usize) {
println!("{:?}", df.head(Some(n)));
}
//print the schema of a dataframe
pub fn print_schema(df: &DataFrame) {
println!("{:?}", df.schema());
}
//print the shape of a dataframe
pub fn print_shape(df: &DataFrame) {
println!("{:?}", df.shape());
}
/// Replaces NaN with zeroes.
pub fn fill_nan_with_zeroes(df: &DataFrame) -> PolarsResult<DataFrame> {
let mut transformed_df = df.clone();
for idx in 0..transformed_df.width() {
transformed_df.try_apply_at_idx(idx, |series| {
if let Ok(ca) = series.f64() {
let ca_filled = ca.fill_null_with_values(0.0);
ca_filled
} else {
series.f64().cloned()
}
})?;
}
Ok(transformed_df)
}
</code>
<code>Some(Commands::Impute { input_path, output_path }) => {
// Start measuring time
let start = Instant::now();
let df = polars2::read_parquet(&input_path);
let mut imputed_df = polars2::fill_nan_with_zeroes(&df).expect("Error");
polars2::write_parquet(&mut imputed_df, &output_path);
// End measuring time
let duration = start.elapsed();
println!("Total duration: {:.2} seconds", duration.as_secs_f64());
}
// utilities for working with polars dataframes
//
use polars::prelude::*;
use std::fs::File;
//use polars::df;
//read in a parquet file
pub fn read_parquet(path: &str) -> DataFrame {
// Open file
let file = File::open(path).expect("Failed to open file");
// Read to DataFrame and Return
ParquetReader::new(file)
.finish()
.expect("Failed to read Parquet file")
}
// write dataframe to parquet file
pub fn write_parquet(df: &mut DataFrame, path: &str) {
// create a file
let file = File::create(path)
.expect("could not create output file");
// write dataframe to output parquet
ParquetWriter::new(file)
.finish(df)
.expect("Failed to write dataframe.");
}
//print "n" rows of a dataframe
pub fn print_df(df: &DataFrame, n: usize) {
println!("{:?}", df.head(Some(n)));
}
//print the schema of a dataframe
pub fn print_schema(df: &DataFrame) {
println!("{:?}", df.schema());
}
//print the shape of a dataframe
pub fn print_shape(df: &DataFrame) {
println!("{:?}", df.shape());
}
/// Replaces NaN with zeroes.
pub fn fill_nan_with_zeroes(df: &DataFrame) -> PolarsResult<DataFrame> {
let mut transformed_df = df.clone();
for idx in 0..transformed_df.width() {
transformed_df.try_apply_at_idx(idx, |series| {
if let Ok(ca) = series.f64() {
let ca_filled = ca.fill_null_with_values(0.0);
ca_filled
} else {
series.f64().cloned()
}
})?;
}
Ok(transformed_df)
}
</code>
Some(Commands::Impute { input_path, output_path }) => {
// Start measuring time
let start = Instant::now();
let df = polars2::read_parquet(&input_path);
let mut imputed_df = polars2::fill_nan_with_zeroes(&df).expect("Error");
polars2::write_parquet(&mut imputed_df, &output_path);
// End measuring time
let duration = start.elapsed();
println!("Total duration: {:.2} seconds", duration.as_secs_f64());
}
// utilities for working with polars dataframes
//
use polars::prelude::*;
use std::fs::File;
//use polars::df;
//read in a parquet file
pub fn read_parquet(path: &str) -> DataFrame {
// Open file
let file = File::open(path).expect("Failed to open file");
// Read to DataFrame and Return
ParquetReader::new(file)
.finish()
.expect("Failed to read Parquet file")
}
// write dataframe to parquet file
pub fn write_parquet(df: &mut DataFrame, path: &str) {
// create a file
let file = File::create(path)
.expect("could not create output file");
// write dataframe to output parquet
ParquetWriter::new(file)
.finish(df)
.expect("Failed to write dataframe.");
}
//print "n" rows of a dataframe
pub fn print_df(df: &DataFrame, n: usize) {
println!("{:?}", df.head(Some(n)));
}
//print the schema of a dataframe
pub fn print_schema(df: &DataFrame) {
println!("{:?}", df.schema());
}
//print the shape of a dataframe
pub fn print_shape(df: &DataFrame) {
println!("{:?}", df.shape());
}
/// Replaces NaN with zeroes.
pub fn fill_nan_with_zeroes(df: &DataFrame) -> PolarsResult<DataFrame> {
let mut transformed_df = df.clone();
for idx in 0..transformed_df.width() {
transformed_df.try_apply_at_idx(idx, |series| {
if let Ok(ca) = series.f64() {
let ca_filled = ca.fill_null_with_values(0.0);
ca_filled
} else {
series.f64().cloned()
}
})?;
}
Ok(transformed_df)
}