In Spring Integration, when creating IntegrationFlow, when event initiates a flow I have multiple operations, including splitter and filter. I would like to detect when all of the messages splitted has been processed.
IntegrationFlow
.from(flux)
[... other operators ...]
.split()
[... other operators ...]
.filter<MailVertex?> ({ it != null }) { it.discardChannel(NullChannel()) }
[... other operators ...]
.get()
Single event that triggers entire integration flow is a collection (let’s assume, that it has 3 elements). Then, colletion is split, therefore 3 messages are flowing. After processing them, we perform filtering, and only two (out of three) are not null. Therefore, only two messages reaches end of the flow.
Flow can be processed in parallel, so we cannot assume anything about an order of messages.
How to run an action exactly once per initial event (collection of 3 elements) after processing all two unfiltered messages?
Normally, we can use splitter and then aggregator, which – based on correlation number, sequence number and sequence size will know if all messages were already processed or not. However, in my case, after splitting I have a filter. Therefore, not all messages that splitter produced will ever reach end of the stream.