I have large jumbled csv. To grab data from it I used a python script. When I run the script following errors thrown. How can I get rid of this error and process the data file. File is 10M records and about 4GB in size. I tried the same script without chunking with a small file and it works perfectly.
import pandas as pd
import re
from concurrent.futures import ProcessPoolExecutor, as_completed
# Define column names for the final DataFrame
columns = [
'firstname', 'lastname', 'gender', 'address', 'city', 'state', 'email',
'zip', 'ssn', 'phone', 'ip4', 'country', 'company', 'origin'
]
# List of US state abbreviations
us_states = set([
'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID',
'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS',
'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK',
'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV',
'WI', 'WY'
])
# Function to identify and rearrange the jumbled columns
def identify_columns(row):
data = {
'gender': None, 'address': None, 'city': None, 'state': None,
'email': None, 'zip': None, 'ssn': None, 'ip4': None, 'country': None,
'company': None, 'origin': None
}
for col in row.index:
value = str(row[col]).strip('"')
if re.match(r'^d{9}$', value): # SSN pattern
data['ssn'] = value
elif re.match(r'^d{5}$', value): # ZIP code pattern
data['zip'] = value
elif re.match(r'^d{10}$', value): # Phone number pattern
data['phone'] = value
elif re.match(r'^w+@w+.w+$', value): # Email pattern
data['email'] = value
elif value in ['M', 'F']: # Gender pattern
data['gender'] = value
elif value in us_states: # State pattern
data['state'] = value
elif re.match(r'^d{1,3}(.d{1,3}){3}$', value): # IP address pattern
data['ip4'] = value
elif value == 'US': # Country pattern
data['country'] = value
elif col == 18: # Company column
data['company'] = value
elif col == 20: # Origin column
data['origin'] = value
elif 2 <= col <= 11: # Address columns
if not data['address'] and value: # If address is not set and the value is not empty
data['address'] = value
elif not data['city'] and ' ' in value and value.isalpha(): # If city is not set and value contains a single space and consists only of letters
data['city'] = value
return pd.Series(data)
def process_chunk(chunk):
df_cleaned = pd.DataFrame()
df_cleaned['firstname'] = chunk.iloc[:, 0].str.strip('"')
df_cleaned['lastname'] = chunk.iloc[:, 1].str.strip('"')
df_cleaned['phone'] = chunk.iloc[:, 16].str.strip('"')
df_jumbled = chunk.drop([0, 1, 16], axis=1)
df_jumbled_cleaned = df_jumbled.apply(identify_columns, axis=1)
# Combine the cleaned columns with the jumbled columns
df_combined = pd.concat([df_cleaned.reset_index(drop=True), df_jumbled_cleaned.reset_index(drop=True)], axis=1)
# Ensure columns are in the correct order
df_combined = df_combined[columns]
return df_combined
# File paths
input_file = 'test.csv'
output_file = 'cleaned.csv'
error_file = 'error_chunks.csv'
# Read the CSV file in chunks
chunksize = 10000
chunks = pd.read_csv(input_file, header=None, dtype=str, chunksize=chunksize, error_bad_lines=False)
# Process chunks in parallel
results = []
errors = []
max_workers = 4 # Adjust based on your system's capability
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
for future in as_completed(futures):
chunk = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print("Error processing chunk: {}".format(e))
errors.append(chunk)
# Concatenate all results
if results:
final_df = pd.concat(results, ignore_index=True)
# Save the cleaned DataFrame to a new CSV file
final_df.to_csv(output_file, index=False, quoting=1) # quoting=1 for double quotes
else:
print("No valid data to write.")
# Save error chunks for further analysis
if errors:
error_df = pd.concat(errors, ignore_index=True)
error_df.to_csv(error_file, index=False, quoting=1)
print("Error chunks saved to {}".format(error_file))
else:
print("No error chunks to save.")
Error
Traceback (most recent call last):
File "sort-large.py", line 100, in <module>
final_df = pd.concat(results, ignore_index=True)
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/concat.py", line 287, in concat
return op.get_result()
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/concat.py", line 503, in get_result
mgrs_indexers, self.new_axes, concat_axis=self.bm_axis, copy=self.copy,
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/internals/concat.py", line 54, in concatenate_block_managers
for placement, join_units in concat_plan:
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/internals/concat.py", line 561, in _combine_concat_plans
raise ValueError("Plan shapes are not aligned")
ValueError: Plan shapes are not aligned