I have two data frames: df_selected and df_filtered_mins_60
df_filtered_mins_60.columns()
Output:[“CSku”, “start_timestamp”, “end_timestamp”]
df_selected.columns()
Output:[“DATEUPDATED”, “DATE”, “HOUR”, “CPSKU”, “BB_Status”,
“ActivePrice”, “PrevPrice”, “MinPrice”, “AsCost”,
“MinMargin”, “CPT”, “Comp_Price”, “AP_MSG”]
df_selected.count()
Output: 7,816,521
df_filtered_mins_60.count()
Output: 112,397
What i want to implement is:
iterate through df_filtered_mins_60, for each row take:
start_time = start_timestamp
stop_time = end_timestamp
sku = CSku
Apply below conditions on df_selected WHEN:
DATEUPDATED is equal to or in between start_time and stop_time
AND CPSKU = sku
THEN assign all the rows satisfying this condition with a constant number i. continue doing this until the end of the rows in df_filtered_mins_60. After each update increment i=i+1
Code I wrote is given below. this code never executes instead gets stuck somewhere. It would keep running for hours until I forcefully stop it.
i = 1
df_selected = df_selected.withColumn("counter", lit(0))
# Iterate through each row of df_filtered_mins_60
for row in df_filtered_mins_60.collect():
sku = row['CSku']
start_time = row['start_timestamp']
stop_time = row['stop_timestamp']
# Apply conditions on df_selected and update "counter" column
df_selected = df_selected.withColumn("counter",
when((df_selected.DATEUPDATED >= start_time) &
(df_selected.DATEUPDATED <= stop_time) &
(df_selected.CPSKU == sku),
lit(i)).otherwise(df_selected.counter))
i += 1
# Display the updated df_selected DataFrame with the "counter" column
display(df_selected)
I am assigning counters because I need a set of rows from df_selected which are in between certain time windows for each SKU and this information is present in df_filtered_mins_60. After assigning a counter I need to perform aggregates on other columns in df_selected. Basically, for each window, I need some insights into what was happening during certain time windows.
I need to get the right code in Pyspark to run on Databricks.
Generate Sample Data:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
# Initialize SparkSession
spark_a = SparkSession.builder
.appName("Create DataFrame")
.getOrCreate()
schema = StructType([
StructField("DATEUPDATED", StringType(), True),
StructField("DATE", StringType(), True),
StructField("HOUR", IntegerType(), True),
StructField("CPSKU", StringType(), True),
StructField("BB_Status", IntegerType(), True),
StructField("ActivePrice", DoubleType(), True),
StructField("PrevPrice", DoubleType(), True),
StructField("MinPrice", DoubleType(), True),
StructField("AsCost", DoubleType(), True),
StructField("MinMargin", DoubleType(), True),
StructField("CPT", DoubleType(), True),
StructField("Comp_Price", DoubleType(), True)
])
data=[('2024-01-01T19:45:39.151+00:00','2024-01-01',0,'MSAN10115836',0,14.86,14.86,14.86,12.63,0.00,13.90,5.84) ,
('2024-01-01T19:55:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-01T20:35:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-15T12:55:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:25:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:35:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:51:09.574+00:00','2024-01-01',1,'PFXNDDF4OX',1,20.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T07:28:48.265+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:50:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:52:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26)]
df_selected = spark.createDataFrame(data, schema=schema)
df_selected = df_selected.withColumn("DateUpdated", to_timestamp(df_selected["DATEUPDATED"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
display(df_selected)
Second Dataframe:
schema = StructType([
StructField("CPSKU", StringType(), True),
StructField("start_timestamp", StringType(), True),
StructField("stop_timestamp", StringType(), True)
])
data_2=[('MSAN10115836','2024-01-01T19:45:39.151+00:00','2024-01-01T20:35:10.904+00:00'),
('MSAN10115836','2024-01-08T06:04:16.484+00:00','2024-01-08T06:42:14.912+00:00'),
('DEWNDCB135C','2024-01-15T07:28:48.265+00:00','2024-01-15T07:52:32.412+00:00'),
('DEWNDCB135C','2024-01-15T11:37:56.698+00:00','2024-01-15T12:35:09.693+00:00'),
('PFXNDDF4OX','2024-01-15T12:55:18.528+00:00','2024-01-15T13:51:09.574+00:00'),
('PFXNDDF4OX','2024-01-15T19:25:10.150+00:00','2024-01-15T20:24:36.385+00:00')]
df_filtered_mins_60 = spark.createDataFrame(data_2, schema=schema)
df_filtered_mins_60 = df_filtered_mins_60.withColumn("start_timestamp", to_timestamp(df_filtered_mins_60["start_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
df_filtered_mins_60 = df_filtered_mins_60.withColumn("stop_timestamp", to_timestamp(df_filtered_mins_60["stop_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
display(df_filtered_mins_60)