I used this code to update a new_df. Idea is to get all the records between date_updated and stop time and assign them a number which i will used in group by in next steps. so basically assigning same number to every group between dateupdated and stop time.
# Create an empty DataFrame
new_df = spark.createDataFrame([], df_filtered.schema)
i = 0
# Collect rows of df_filtered_sku1 as a list
rows = df_filtered_sku1.collect()
print('Length rows')
print(len(rows)) #781
for row in rows:
sku = row['Sku']
start_time = row['DATEUPDATED']
end_time = row['stop']
print(sku, start_time, end_time)
df_temp = df_filtered.filter((df_filtered.DATEUPDATED >= start_time) & (df_filtered.DATEUPDATED <= end_time) & (df_filtered.SKU == sku))
df_temp = df_temp.withColumn("counter", lit(i))
print('Temp')
#print(df_temp.count())
# Append the temporary DataFrame to the new_df DataFrame
print('new Frame')
new_df = new_df.union(df_temp)
#print(new_df.count())
i += 1
if i > 780: print(new_df.count()) #2531
display(new_df)
Rows in df_filtered_sku1 are 781 and final new_df has count of 2531. But when i try to display/show the new_df dataframe it never ends and i checked driver logs it is stuck at Allocation Faliure
Cluster Specifications: