To preface, I am learning the language of python, as well as polars.
I am working with individual contribution data from citizens to political candidates between the years of 2010 to 2022. My data comes in the form of multiple text files – each file contains a 2 years worth of individual financial contributions. Using Polars CSV Scan I am loading in multiple text files into a single lazy data frame. The results is a 156,000,000 row x 17 column lazy dataframe.
The end goal is to efficiently and quickly manipulate and clean this lazy data frame so that it may be eventually be linked to another dataframe using probabilistic and deterministic string matching.
In order to eventually link the data, I must convert all strings to lower case and regex the names of people who contributed money. I understand this is an intensive process, and that I might have some short comings in my python code.
df = df.collect(streaming=True)
Below is the entirety of my code
import polars as pl
# Define the schema for the data
schema = {
"Cycle": pl.Int16,
"FECTransID": pl.Int64,
"ContribID": pl.Utf8,
"Contrib": pl.Utf8,
"RecipID": pl.Utf8,
"Orgname": pl.Utf8,
"UltOrg": pl.Utf8,
"RealCode": pl.Utf8,
"Date": pl.Date,
"Amount": pl.Int32,
"Street": pl.Utf8,
"City": pl.Utf8,
"State": pl.Categorical,
"Zip": pl.Int32,
"RecipCode": pl.Utf8,
"Type": pl.Utf8,
"CmteID": pl.Utf8,
"OtherID": pl.Utf8,
"Gender": pl.Categorical,
"Microfilm": pl.Utf8,
"Occupation": pl.Utf8,
"Employer": pl.Utf8,
"Source": pl.Utf8
}
# Load data with specified schema
df = pl.scan_csv(
r'C:PathToYourFileindivs*.txt',
separator=',',
quote_char='|',
schema=schema,
encoding='utf8-lossy',
ignore_errors=True
).drop(["Microfilm", "OtherID", "Source", "Date", "Street"])
# List of string columns to convert to lowercase
string_columns = [
"ContribID", "Contrib", "RecipID", "Orgname", "UltOrg",
"RealCode", "City", "RecipCode", "Type", "CmteID",
"Occupation", "Employer"
]
# Convert string columns to lowercase
df = df.with_columns([
pl.col(col).str.to_lowercase().alias(col) for col in string_columns
])
# Collect column names and create a renaming map to lowercase names
rename_map = {col: col.lower() for col in df.collect_schema().names()}
# Rename the columns
df = df.rename(rename_map)
# Add a unique identifier column
df = df.with_columns(unique_id=pl.int_range(pl.len()))
# Define regex patterns for extracting name components
regex_last = r'([^,]+)'
regex_middle = r'b([A-Za-z])bs*$'
regex_first = r',s+([A-Za-zs]+)'
# Extract name components and clean up first name
df = df.with_columns([
pl.col("contrib").str.extract(regex_last, group_index=1).alias("last"),
pl.col("contrib").str.extract(regex_middle, group_index=1).alias("middle"),
pl.col("contrib").str.extract(regex_first, group_index=1).alias("first")
]).with_columns(
pl.col("first").str.replace(r"(s+[A-Za-z].?)s*$", "").alias("first")
)
# Concatenate names into a full name
df = df.with_columns(
pl.when(pl.col("middle").is_not_null())
.then(pl.col("first") + " " + pl.col("middle") + ". " + pl.col("last"))
.otherwise(pl.col("first") + " " + pl.col("last"))
.alias("full")
)
def remove_all_non_alphanumeric(col):
# Remove non-alphanumeric characters from the column
return col.str.replace_all("[^a-zA-Z0-9]", "")
# Apply the cleaning function to specific columns
df = df.with_columns([
remove_all_non_alphanumeric(pl.col("employer")).alias("employer"),
remove_all_non_alphanumeric(pl.col("zip")).alias("zip"),
remove_all_non_alphanumeric(pl.col("city")).alias("city"),
remove_all_non_alphanumeric(pl.col("ultorg")).alias("ultorg"),
remove_all_non_alphanumeric(pl.col("orgname")).alias("orgname"),
])
# Collect the final DataFrame, enabling streaming to handle large data
df = df.collect(streaming=True)
My available CPU, Ram, and Disk writing capacity drops initially, which communicates that my code is actively working. Eventually after a few minuets my CPU, Ram, and Disk writing capacity increase suggesting the load has been lifted. Maybe I am misunderstanding, but I believe this means not a whole lot is going on with regards to my code. This leads me to further believe my code is hung up or looping in a way I do not understand.
df.with_row_index(offset=1)
worked for me too, without this it just hangs.
But can someone tell why it works like this. In documentation there is warning on using this method as it degrades performance.
1
I limited the size of my frame to 50 millow rows and received a string caching error. I quickly realized that adding
pl.enable_string_cache()
pl.Config.set_streaming_chunk_size(2000000)
to the start of my code, and eliminating the line that created an index solved my issue. Polars does not seem well equipped to create indexes at scale.
2