I have a grpc service which ofer an API looking like that def f(input: fs2.Stream[F, Input]) : fs2.Stream[F, Output]
The input have an Cancel
element that I can send to make a proper closing of the stream in case of interuption.
The issue is I find no way to put a last cancel element in the input stream when it is cancel as .onComplete
would do. It anoying because sometime I want to be able to interup the output stream (resulting in the interuption of the input stream)
I tried solution with mergeHaltBoth
combine by a Deferred completed in the onFinalize
of the input stream, but the Finalize of the Deferred is cancel before the execution of the completed
and I find no why to make a stream uninterruptible.
the idea api would be something looking like .onComplete
like .onInteruption(finalElements : fs2.Stream[F,O]): fs2.Stream[F,O]