I have a timeseries dataset and am looking to make a new column that represents the last reported values (not null). I think I have this part figured out, using a combination of lag
and last
I would also like to know the timestamp for that last reported (non null) value. note:I never expect timestamp_ms
to be null.
Sample Data
df = spark.createDataFrame([
Row(timestamp_ms=1672531200000, val='19'),
Row(timestamp_ms=1672532100000, val='20'),
Row(timestamp_ms=1672533000000, val=None),
Row(timestamp_ms=1672533900000, val='22'),
Row(timestamp_ms=1672534800000, val=None),
Row(timestamp_ms=1672535700000, val=None),
Row(timestamp_ms=1672536600000, val='25'),
Row(timestamp_ms=1672537500000, val='20'),
Row(timestamp_ms=1672538400000, val='27')
])
df.show()
Sample Code
Returns the Last Lagged Value and attempts to return the timestamp when that value was reported.
df_lag = df.withColumn("lag_prev_val", F.lag("val")
.over(Window.partitionBy()
.orderBy("timestamp_ms"))
)
.withColumn("last_lag_prev_val", F.last("lag_prev_val", True)
.over(Window.partitionBy()
.orderBy("timestamp_ms"))
)
.withColumn("last_lag_prev_time", F.lag("timestamp_ms")
.over(Window.partitionBy()
.orderBy("timestamp_ms"))
)
df_lag.show()
Current Output
last_lag_prev_time
represents the previously reported timestamp, rather than the timestamp associated with the last_lag_prev_val
timestamp_ms | val | lag_prev_val | last_lag_prev_val | last_lag_prev_time |
---|---|---|---|---|
1672531200000 | 19 | null | null | null |
1672532100000 | 20 | 19 | 19 | 1672531200000 |
1672533000000 | null | 20 | 20 | 1672532100000 |
1672533900000 | 22 | null | 20 | 1672533000000 |
1672534800000 | null | 22 | 22 | 1672533900000 |
1672535700000 | null | null | 22 | 1672534800000 |
1672536600000 | 25 | null | 22 | 1672535700000 |
1672537500000 | 20 | 25 | 25 | 1672536600000 |
1672538400000 | 27 | 20 | 20 | 1672537500000 |
Ideal output
The output I want (bolded the differences) is for the last_lag_prev_time
column to represent the timestamp_ms
value that is from the same row as the original val
that was used to populate `last_lag_prev_val’
timestamp_ms | val | lag_prev_val | last_lag_prev_val | last_lag_prev_time |
---|---|---|---|---|
1672531200000 | 19 | null | null | null |
1672532100000 | 20 | 19 | 19 | 1672531200000 |
1672533000000 | null | 20 | 20 | 1672532100000 |
1672533900000 | 22 | null | 20 | 1672532100000 |
1672534800000 | null | 22 | 22 | 1672533900000 |
1672535700000 | null | null | 22 | 1672533900000 |
1672536600000 | 25 | null | 22 | 1672533900000 |
1672537500000 | 20 | 25 | 25 | 1672536600000 |
1672538400000 | 27 | 20 | 20 | 1672537500000 |