I’m attempting to write a service which will handle uploading files to S3. It should calculate a fingerprint for the file, and determine the format of the file to include in the S3 metadata. I’m working with a Stream of Bytes. My first attempt at this looked like this:
override def upload(bytes: fs2.Stream[F, Byte]): F[DocumentId] = (for {
now <- fs2.Stream.eval(Clock[F].realTimeInstant)
fileFormat <- detect(bytes)
fingerprint <- calculate(bytes)
metadata = DocumentMetadata(documentType, fileFormat, fingerprint)
_ <- fs2.Stream.eval(
s3Service.upload(
S3Key(metadata.id.value),
bytes.mapChunks(chunk => Chunk(ByteBuffer.wrap(chunk.toArray))),
metadata
)
)
documentId <- fs2.Stream.eval(documentRepository.create(metadata.transformInto[DocumentMetadataDb]))
} yield documentId).compile.lastOrError
def detect(bytes: fs2.Stream[F, Byte]): fs2.Stream[F, FileFormat] = bytes
.through(fs2.io.toInputStream)
.evalMap(is => Sync[F].delay(tika.detect(is)))
.map(FileFormat(_))
def calculate(bytes: fs2.Stream[F, Byte]): fs2.Stream[F, String] = bytes
.through(fs2.hash.sha256)
.chunks
.map(digestChunk => FormatUtils.base16.encode(digestChunk.toArray))
But I realised this is probably not doing what I expected it to do, and probably needs to load the full Stream
into memory.
I’ve been trying to utilise the broadcastThrough
method on the source Stream (and also constructing a Topic manually myself) to concurrently determine the fileFormat of the bytes, aswell as compute the fingerprint andThen
using those two values continue the stream to upload to S3, with the metadata constructed from the result of the first two pipes (and the original byte stream), as if I run all 3 inside the broadcastThrough
the . It seems that this is not possible (?).
As such, I refactored my service a bit to allow the metadata to be set AFTER the file is uploaded to S3.
bytes.broadcastThrough(
detect,
calculate,
fs2.Stream.eval(s3Service.upload(S3Key.random(), _)).drain // returns F[Unit]
)
// .chunkN(2, allowFewer = false) get the two values emitted from detect and calculate
// .map { } using the two values, construct the metadata
// .evalMap(metadata => s3Service.copy(documentId, metadata))
but again here I run into the same issue of needing to await for the fileFormat and fingerprint pipes to emit, but as they emit different types, I work with a Stream[F, Serializable]
. So this feels inherently wrong, I guess I am missing some help on this one.
How could I accomplish what I would like to here?