I have a timeseries:
ts = pl.DataFrame(
{
"timestamp": [0, 1, 2, 3, 4, 5],
"start_index": [0, None, None, 3, None, None],
"end_index": [4, None, None, 4, None, None],
}
)
for each timestamp index, I want to calculate the number of concurrent events. For example,
- At timestamp 0, we have an event starting at the end of timestamp 0 and ends at the end of timestamp 4, for a span of [1, 2, 3, 4].
- At timestamp 3 we have another event starting at end of timestamp 3 and ends at 4, for a span of [4].
The first event’s span overlaps with the second event during timestamp 3-4 therefore the number of concurrent event at timestamp 4 is 2. We assign 0 to the observations that do not have an active event.
Expected output:
shape: (6, 4)
┌───────────┬─────────────┬───────────┬────────────────────────────┐
│ timestamp ┆ start_index ┆ end_index ┆ number_of_concurrent_event │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═══════════╪═════════════╪═══════════╪════════════════════════════╡
│ 0 ┆ 0 ┆ 4 ┆ 0 │
│ 1 ┆ null ┆ null ┆ 1 │
│ 2 ┆ null ┆ null ┆ 1 │
│ 3 ┆ 3 ┆ 4 ┆ 1 │
│ 4 ┆ null ┆ null ┆ 2 │
│ 5 ┆ null ┆ null ┆ 0 │
└───────────┴─────────────┴───────────┴────────────────────────────┘
2
A signal analysis trick is to map all the start of events to +1 and the end
of events to -1 and cumulative sum everything between.
In your data, since your events can end simultaneously we’ll
need to step through a .value_counts()
to grab the correct number of ending
events at a given timestamp.
This has the advantage of not needing to generate a sparser timeseries and only relies on the dense timepoints that already exist.
import polars as pl
from polars import col
ts = pl.DataFrame({
"timestamp": [0, 1, 2, 3, 4, 5],
"start_index": [0, None, None, 3, None, None],
"end_index": [4, None, None, 4, None, None],
})
print(
ts
.join(
ts.select(
col('end_index').filter(col('end_index').is_not_null())
.value_counts()
).unnest('end_index'),
left_on='timestamp',
right_on='end_index',
how='outer',
)
.select(
*ts.columns,
col('start_index').is_not_null().cast(pl.Int32)
.add(col('count').mul(-1).fill_null(0))
.shift(fill_value=0) # offsets the start/stop by 1 row
.cum_sum()
.alias('concurrent')
)
# shape: (6, 4)
# ┌───────────┬─────────────┬───────────┬────────────┐
# │ timestamp ┆ start_index ┆ end_index ┆ concurrent │
# │ --- ┆ --- ┆ --- ┆ --- │
# │ i64 ┆ i64 ┆ i64 ┆ i64 │
# ╞═══════════╪═════════════╪═══════════╪════════════╡
# │ 0 ┆ 0 ┆ 4 ┆ 0 │
# │ 1 ┆ null ┆ null ┆ 1 │
# │ 2 ┆ null ┆ null ┆ 1 │
# │ 3 ┆ 3 ┆ 4 ┆ 1 │
# │ 4 ┆ null ┆ null ┆ 2 │
# │ 5 ┆ null ┆ null ┆ 0 │
# └───────────┴─────────────┴───────────┴────────────┘
)
Here is a more generic approach if your data can have events that begin at the same start_index
.
import polars as pl
from polars import col
ts = pl.DataFrame({
"timestamp": [0, 1, 2, 3, 4, 5],
"start_index": [0, None, None, 3, None, None],
"end_index": [4, None, None, 4, None, None],
})
print(
pl.concat(
items=[
ts,
ts.select( # count the starting indices
col('start_index').value_counts()
.struct.rename_fields(['timestamp', 'starts'])
).unnest('start_index'),
ts.select( # count the ending indices
col('end_index').value_counts()
.struct.rename_fields(['timestamp', 'ends'])
).unnest('end_index'),
],
how='align'
)
.drop_nulls('timestamp')
.select(
*ts.columns,
concurrent=(
pl.sum_horizontal(col('starts'), -1 * col('ends'))
.shift(fill_value=0)
.cum_sum()
)
)
)
I would start out creating a new frame that converts the start_index
to end_index
into individual rows. From that I’d do an outer_coalesce
join to the original and then group by the timestamp taking the non-null count of the original_timestamp as the concurrent events.
(
ts
.filter(pl.col('start_index').is_not_null() & pl.col('end_index').is_not_null())
.select(
original_timestamp="timestamp",
timestamp=pl.int_ranges(
pl.col('start_index')+1,
pl.col('end_index')+1
)
)
.explode('timestamp')
.join(ts,
on='timestamp',
how='outer_coalesce'
)
.group_by('timestamp',maintain_order=True)
.agg(
pl.col('start_index','end_index').drop_nulls().first(),
pl.col('original_timestamp').is_not_null().sum()
)
)
shape: (6, 4)
┌───────────┬─────────────┬───────────┬────────────────────────────┐
│ timestamp ┆ start_index ┆ end_index ┆ number_of_concurrent_event │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ u32 │
╞═══════════╪═════════════╪═══════════╪════════════════════════════╡
│ 0 ┆ 0 ┆ 4 ┆ 0 │
│ 1 ┆ null ┆ null ┆ 1 │
│ 2 ┆ null ┆ null ┆ 1 │
│ 3 ┆ 3 ┆ 4 ┆ 1 │
│ 4 ┆ null ┆ null ┆ 2 │
│ 5 ┆ null ┆ null ┆ 0 │
└───────────┴─────────────┴───────────┴────────────────────────────┘
I think this is basically the same as @Cameron-Riddell’s approach – just expressed a little differently.
ts = ts.with_columns(starts = pl.col("start_index").is_not_null())
(
ts.join(ts, left_on="timestamp", right_on="end_index", how="left")
.rename({"starts_right": "ends"})
.group_by("timestamp", maintain_order=True)
.agg(
pl.exclude("starts", "ends").first(),
pl.col("starts", "ends").sum()
)
.with_columns(
concurrent = (pl.col("starts") + (pl.col("ends") * -1)).cum_sum().shift()
)
)
shape: (6, 8)
┌───────────┬─────────────┬───────────┬─────────────────┬───────────────────┬────────┬──────┬────────────┐
│ timestamp ┆ start_index ┆ end_index ┆ timestamp_right ┆ start_index_right ┆ starts ┆ ends ┆ concurrent │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 ┆ i64 ┆ u32 ┆ u32 ┆ i64 │
╞═══════════╪═════════════╪═══════════╪═════════════════╪═══════════════════╪════════╪══════╪════════════╡
│ 0 ┆ 0 ┆ 4 ┆ null ┆ null ┆ 1 ┆ 0 ┆ null │
│ 1 ┆ null ┆ null ┆ null ┆ null ┆ 0 ┆ 0 ┆ 1 │
│ 2 ┆ null ┆ null ┆ null ┆ null ┆ 0 ┆ 0 ┆ 1 │
│ 3 ┆ 3 ┆ 4 ┆ null ┆ null ┆ 1 ┆ 0 ┆ 1 │
│ 4 ┆ null ┆ null ┆ 0 ┆ 0 ┆ 0 ┆ 2 ┆ 2 │
│ 5 ┆ null ┆ null ┆ null ┆ null ┆ 0 ┆ 0 ┆ 0 │
└───────────┴─────────────┴───────────┴─────────────────┴───────────────────┴────────┴──────┴────────────┘