Objective
The problem I am trying to solve is aggregating the sequence (summing values) of messages of type (int Key, int Value)
until a closing observable emits a “flush” marker item.
For example, given a sequence of (Key,Value)
items
(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)
When Flush
is triggered I expect an array with [(1,4), (2,3)]
to be emitted.
When the sequence is completed – an array with [(2,6)]
should be emitted.
What I’ve tried
I started with GroupBy
+ Aggregate
+ Buffer(flush)
as an intuitive way of achieving required behavior. However, the sequence does not emit intermediate results, but rather all the aggregates as the final output.
Here’s the full code
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();
var completion = StartProcessing(source, flush);
source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));
flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected
source.OnNext((2, 1));
source.OnNext((2, 5));
source.OnCompleted(); // emit of [(2,6)] is expected
await completion;
return;
static Task StartProcessing(
IObservable<(int Key, int Value)> source,
IObservable<Unit> flush)
{
return source
.GroupBy
(
keySelector: message => message.Key
)
.SelectMany
(
group => group
.Aggregate
(
seed: (group.Key, Value: 0),
accumulator: (output, item) => (output.Key, output.Value + item.Value)
)
)
.Buffer(flush)
.Select(buffer => Observable.FromAsync(() => Flush(buffer)))
.Merge()
.ToTask();
}
static async Task Flush(IEnumerable<(int Key, int Value)> data)
{
Console.WriteLine($"Flushing [{string.Join(", ", data)}]");
// processing takes time
await Task.Delay(TimeSpan.FromSeconds(1));
}
The output is
Flushing []
Flushing [(1, 4), (2, 9)]
As far as I get it (not sure though), the Buffer
does not emit values unless sequence is completed because Aggregate
does not propagate intermediate values.
I also tried to use GroupByUntil
instead of GroupBy
and Buffer
. With that structure I get intermediate output on group closing, but each group’s aggregate is emitted one by one and it is unclear how to bundle them so that they are not flushed separately.
Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)
So far I got stuck and looking for some help, specifically:
- what is considered the correct – in terms of RX – way of implementing the required behavior?
- whether it can be achieved using built-in operators or should I create custom ones (e.g. a custom AggregateUntil)?