RxJS pros out there, I need your help!
Based on my last question I’m now looking for something different:
Question: How to separate values of a stream, keep the last of each and force flush after a certain time?
I have a solution which works totally fine, but it doesn’t feel right / optimized.
This is my current solution in two steps.
Step 1 preparation: obs$ fires random numbers at random every 100 miliseconds
const list = [1,1,1,1,3];
const obs$ = timer(0, 100)
.pipe(take(list.length))
.pipe(map(x => list[x]));
// => 1 - [100ms] - 1 - [100ms] - 1 - [100ms] - 1 - [100ms] - 3
Step 2: split by number, collect data, flush after 300ms
obs$.pipe(
map((x, idx) => ({item: x, idx})), // debug #1 ????
tap(x => console.log(x.idx + ": " + x.item)), // debug #2 ????
groupBy(x => x.item),
mergeMap(grouped =>
grouped.pipe(bufferTime(300), filter((x) => !!x.length), map(x => x[x.length-1])) // ⚠️ the ugly part
)
)
.subscribe(x => console.log("✅" + x.idx + ": " + x.item));
Result
0 => 1
1 => 1
2 => 1
✅ 2 => 1
3 => 1
4 => 1
✅ 3 => 1
✅ 4 => 3
It works! We got always the last value of kind and additionally one value every 300ms!
Even though it does the job, it has two downfalls:
- “⚠️ the ugly part” is ugly. Feels like there has to be an alternative.
- bufferTime() “collects” data even though the data is always the same. E.g. here it saved [1,1,1].
So, is there an leaner way to achieve the same?
Note: grouped.pipe(debounceTime(300)) won’t do the same as my code. It won’t output “✅ 2 => 1” because the value 1 keeps coming in. So basically I’m looking for a debounce with a “forced flush every x seconds”.
Stackblitz