I’m running some Pyspark code like this to help clean up an input dataframe by calculating dataframes containing ID’s I’d like to remove :
# each of these DF's on their own are the same 2 cols (col_a, col_b), and maybe 5-10K rows each
df1 = evalFunc1(inputDf, ... others)
df2 = evalFunc2(inputDf, ... others)
df3 = evalFunc3(inputDf, ... others)
# ...etc more
unioned_df = (
df1.union(df2).union(df3)
# the rest of the unions
)
unioned_df.persist(StorageLevel.MEMORY_AND_DISK)
print(f"all row offers before filters: {input_df.count()}") # maybe 150K Max
cleaned_input_df = input_df.join(
unioned_df,
on=["col_a", "col_b"],
how="left_anti",
)
cleaned_input_df.persist(StorageLevel.MEMORY_AND_DISK)
after_filter_count = cleaned_input_df.count()
print(f"rows after filter : {after_filter_count}") # usually about #30K - so big reduction
Normally the above takes about 2-3 minutes to execute. It’s super small volume of data. However, I had a change in my logic, and I realized I needed to move evalFunc3 down to execute in a specific order – to do a similar thing, except execute AFTER the other functions have executed – so the above turns into this :
df1 = evalFunc1(inputDf, ... others)
df2 = evalFunc2(inputDf, ... others)
# df3 = evalFunc3(inputDf, ... others) # no longer executing here
# ...etc more
unioned_df = (
df1.union(df2)
# the rest of the unions (except not df3)
)
unioned_df.persist(StorageLevel.MEMORY_AND_DISK)
print(f"all row offers before filters: {input_df.count()}") # still approx 150K Max
cleaned_input_df = input_df.join(
unioned_df, *emphasized text*
on=["col_a", "col_b"],
how="left_anti",
)
cleaned_input_df.persist(StorageLevel.MEMORY_AND_DISK)
after_filter_count = cleaned_input_df.count()
print(f"rows after filter : {after_filter_count}") # usually about #30K - so big reduction
# now execute df3 with the now smaller cleaned_input_df
df3 = evalFunc3(cleaned_input_df, ... others)
print("resulting rowcount ", df3.count()) # force evaluation - this now takes 13 minutes
final_cleaned_input_df = cleaned_input_df.join(
df3,
on=["col_a", "col_b"],
how="left_anti",
)
The difference is – now the evaluation of df3 takes more than 4 times the length of execution, despite the fact that I’m now doing the same logic, except on less data. The actual code’s query plan is enormous in both scenarios, like 70K rows or more, so I can’t really use that to figure out what’s going on. it’s just that the first execution takes very little time, as expected. The data is never supremely large, always under 200K rows, and no dataframe in the above has more than 15 rows of simple text and ints.
What causes the explosion of run time if I have my code like it is in the second code block?
I’m running this in a Pyspark Notebook on Databricks in Azure, using a 14.3 photon-enhanced cluster. I’m really scratching my head here. My cluster is absolutely huge too, a single node one – but with 128 gb memory, so should be plenty. And again, in the first cell code block, it runs fine. Any idea what’s causing the increase? Thanks!