I came up with the code below.
I want to stream the result of a Doobie query as JSON using htpp4s. The core problem is that errors (like malformed query) only occuring stream evaluation when http4s has already sent the header (status Ok) and begins to stream the body.
Thus I tried to create a function peekFirstElement
, which shall materialize an error that happens during fetching the first chunk into the enclosing IO.
I’ve hit a dead end, as the current implementation fails after streaming the first (and only chunk) with an exception from Postgres that the result set has already been closed. And yes, the next()
that delivered the last item of the first and only chunk reported false
.
Apparently this information somehow gets lost, while I think it should be contained in the tl
variable of the peekFirstElement
function.
Any thoughts?
/** Peeks at first element of stream to materialize any exceptions that might occur. */
def peekFirstElement[O](stream: Stream[IO, O]): IO[Stream[IO, O]] = {
val x: Pull[IO, Stream[IO, O], Unit] = stream.pull.uncons.flatMap {
case Some((first, tl)) =>
Pull.output1(Stream.chunk(first) ++ tl)
case None =>
Pull.output1(Stream.empty)
}
x.stream.compile.onlyOrError
}
def routes(conns: List[(Regex, Transactor[IO])]): HttpRoutes[IO] = {
val config = FileService.Config[IO](absolutePath)
//post process the file content by interpreting it as a query and executing it
val pathCollector: FileService.Fs2PathCollector[IO] = (f, cfg, request) => {
conns.find(_._1.matches(request.pathInfo.toString))
.map { case (_, transactor) =>
for {
content <- config.fs2PathCollector(f, cfg, request)
query <- OptionT.liftF(content.body.compile.toList.map(bytes => new String(bytes.toArray)))
_ <- OptionT.liftF(logger[IO].debug(s"executing query: "$query""))
outerQuery = sql"select to_json(querydata) from (${Fragment.const(query)}) as querydata".query[Json]
queryResult = outerQuery.stream.transact(transactor)
queryPeeked <- OptionT.liftF(peekFirstElement(queryResult))
streamList = Stream.emit(Token.StartArray) ++ queryPeeked.through(tokenize) ++ Stream(Token.EndArray)
response <- OptionT.liftF(
Ok(streamList.through(fs2.data.json.render.compact).through(fs2.text.utf8.encode)).handleErrorWith(
e => InternalServerError(e.toString)
)
)
} yield response
}.getOrElse(config.fs2PathCollector(f, cfg, request))
}
fileService[IO](FileService.Config[IO](absolutePath, pathCollector, "", NoopCacheStrategy[IO], 32 * 1024))
}