Currently I’m trying to clean data from the spotify million playlist dataset so that I produce a single csv at the end. When the program reaches the concatenation step, it takes around 80 seconds per chunk of size 100, and given that there are 1000 files, needs around ~ 800 seconds to complete.
<code>def read_playlists(file_path):
with open(file_path, "rb") as infile:
data = json.load(infile)
for playlist in data['playlists']:
playlist_df = pd.DataFrame(playlist['tracks'], columns=['track_name', 'track_uri', 'artist_name', 'artist_uri', 'album_name', 'album_uri'])
yield playlist_df # returns a generator containing dataframe
def clean_data():
p = psutil.Process()
start = time.process_time()
read_files = sorted(glob.glob("spotify_million_playlist_dataset/data/*.json"), key=len)
# Initialize an empty DataFrame
df = pd.DataFrame(columns=['track_name', 'track_uri', 'artist_name', 'artist_uri', 'album_name', 'album_uri'])
chunk_size = 100 # Adjust this based on available memory
for chunk_files in zip(*[iter(read_files)] * chunk_size): # Read files in chunks
chunk_dfs = []
dfs = []
with ThreadPoolExecutor(max_workers=None) as executor:
for chunk_file in chunk_files: # Read playlists in parallel, iterates through each json file
executor.submit(chunk_dfs.extend, read_playlists(chunk_file))
print(chunk_file)
# Concatenate DataFrames for this chunk
chunk_df = pd.concat(chunk_dfs)
# Drop duplicates based on track_name and artist_uri
chunk_df.drop_duplicates(subset=['track_uri'], inplace=True)
# Append to the main DataFrame
df = pd.concat([df, chunk_df], ignore_index=True)
print(df[-5:])
print(f"Time taken: {time.process_time() - start:.2f} seconds")
# Drop duplicates again (in case of duplicates across chunks)
df.drop_duplicates(subset=['track_uri'], inplace=True)
print(df)
# Save cleaned DataFrame to CSV
df.to_csv("cleaned_df.csv", index=False)
print(f"(Peak memory usage: {p.memory_info().peak_wset / 10**9 :.2f} GB)")
print(f"Total time taken: {time.process_time() - start:.2f} seconds")
</code>
<code>def read_playlists(file_path):
with open(file_path, "rb") as infile:
data = json.load(infile)
for playlist in data['playlists']:
playlist_df = pd.DataFrame(playlist['tracks'], columns=['track_name', 'track_uri', 'artist_name', 'artist_uri', 'album_name', 'album_uri'])
yield playlist_df # returns a generator containing dataframe
def clean_data():
p = psutil.Process()
start = time.process_time()
read_files = sorted(glob.glob("spotify_million_playlist_dataset/data/*.json"), key=len)
# Initialize an empty DataFrame
df = pd.DataFrame(columns=['track_name', 'track_uri', 'artist_name', 'artist_uri', 'album_name', 'album_uri'])
chunk_size = 100 # Adjust this based on available memory
for chunk_files in zip(*[iter(read_files)] * chunk_size): # Read files in chunks
chunk_dfs = []
dfs = []
with ThreadPoolExecutor(max_workers=None) as executor:
for chunk_file in chunk_files: # Read playlists in parallel, iterates through each json file
executor.submit(chunk_dfs.extend, read_playlists(chunk_file))
print(chunk_file)
# Concatenate DataFrames for this chunk
chunk_df = pd.concat(chunk_dfs)
# Drop duplicates based on track_name and artist_uri
chunk_df.drop_duplicates(subset=['track_uri'], inplace=True)
# Append to the main DataFrame
df = pd.concat([df, chunk_df], ignore_index=True)
print(df[-5:])
print(f"Time taken: {time.process_time() - start:.2f} seconds")
# Drop duplicates again (in case of duplicates across chunks)
df.drop_duplicates(subset=['track_uri'], inplace=True)
print(df)
# Save cleaned DataFrame to CSV
df.to_csv("cleaned_df.csv", index=False)
print(f"(Peak memory usage: {p.memory_info().peak_wset / 10**9 :.2f} GB)")
print(f"Total time taken: {time.process_time() - start:.2f} seconds")
</code>
def read_playlists(file_path):
with open(file_path, "rb") as infile:
data = json.load(infile)
for playlist in data['playlists']:
playlist_df = pd.DataFrame(playlist['tracks'], columns=['track_name', 'track_uri', 'artist_name', 'artist_uri', 'album_name', 'album_uri'])
yield playlist_df # returns a generator containing dataframe
def clean_data():
p = psutil.Process()
start = time.process_time()
read_files = sorted(glob.glob("spotify_million_playlist_dataset/data/*.json"), key=len)
# Initialize an empty DataFrame
df = pd.DataFrame(columns=['track_name', 'track_uri', 'artist_name', 'artist_uri', 'album_name', 'album_uri'])
chunk_size = 100 # Adjust this based on available memory
for chunk_files in zip(*[iter(read_files)] * chunk_size): # Read files in chunks
chunk_dfs = []
dfs = []
with ThreadPoolExecutor(max_workers=None) as executor:
for chunk_file in chunk_files: # Read playlists in parallel, iterates through each json file
executor.submit(chunk_dfs.extend, read_playlists(chunk_file))
print(chunk_file)
# Concatenate DataFrames for this chunk
chunk_df = pd.concat(chunk_dfs)
# Drop duplicates based on track_name and artist_uri
chunk_df.drop_duplicates(subset=['track_uri'], inplace=True)
# Append to the main DataFrame
df = pd.concat([df, chunk_df], ignore_index=True)
print(df[-5:])
print(f"Time taken: {time.process_time() - start:.2f} seconds")
# Drop duplicates again (in case of duplicates across chunks)
df.drop_duplicates(subset=['track_uri'], inplace=True)
print(df)
# Save cleaned DataFrame to CSV
df.to_csv("cleaned_df.csv", index=False)
print(f"(Peak memory usage: {p.memory_info().peak_wset / 10**9 :.2f} GB)")
print(f"Total time taken: {time.process_time() - start:.2f} seconds")
I have attempted to reduce this runtime by storing the dataframes into a single list and performing a single concatenation at then end but that increases memory usage greatly.