I need decompress data using compression not supported by akka, but supported by other library which provided InputStream interface.
To make it work with akka stream, I need to implement function:
def pipeThroughInputStream(pipeThrough: InputStream => InputStream): Flow[ByteString, ByteString, NotUsed]
But I’m not sure how to do it.
I’m aware about conversion functions like StreamConverters.asInputStream
and StreamConverters.fromInputStream
but I don’t know how to apply them here. All I have come up with so far is
def pipeThroughInputStream(pipeThrough: InputStream => InputStream): Flow[ByteString, ByteString, NotUsed] = {
val sink: Sink[ByteString, Source[ByteString, Future[IOResult]]] = StreamConverters.asInputStream().mapMaterializedValue { materializedInputStream =>
val inputStream = pipeThrough(materializedInputStream)
StreamConverters.fromInputStream(() => inputStream)
}
???
}
But I don’t know how now convert this Sink that materializes to Source back to Flow.
3