In my akka-streams(scala) application, I am using google pubsub as the source, querying from a BigQuery table and then publishing an outcome to a kafka topic and later acknowledging the pubsub
message. But I am intermittently facing this issue.
Runtime exception encountered: [{}], stopping stream
akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException: The http server closed the connection unexpectedly before delivering responses for 1 outstanding requests
at akka.http.impl.engine.client.OutgoingConnectionBlueprint$.$anonfun$apply$7(OutgoingConnectionBlueprint.scala:123)
at akka.http.impl.engine.client.OutgoingConnectionBlueprint$.$anonfun$apply$7$adapted(OutgoingConnectionBlueprint.scala:123)
at akka.http.impl.util.One2OneBidiFlow$One2OneBidi$$anon$1$$anon$4.onUpstreamFinish(One2OneBidiFlow.scala:97)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:536)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:400)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:787)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException: The http server closed the connection unexpectedly before delivering responses for 1 outstanding requests
at akka.http.impl.engine.client.OutgoingConnectionBlueprint$.$anonfun$apply$7(OutgoingConnectionBlueprint.scala:123)
at akka.http.impl.engine.client.OutgoingConnectionBlueprint$.$anonfun$apply$7$adapted(OutgoingConnectionBlueprint.scala:123)
at akka.http.impl.util.One2OneBidiFlow$One2OneBidi$$anon$1$$anon$4.onUpstreamFinish(One2OneBidiFlow.scala:97)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:536)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:400)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:787)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
I am using below:
for pubsub source: akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
for bigquery source: akka.stream.alpakka.googlecloud.bigquery.scaladsl.BigQuery
for bigquery response serialization: defined custom akka.stream.alpakka.googlecloud.bigquery.scaladsl.spray.BigQueryRootJsonFormat
for kafka publish sink: akka.kafka.scaladsl.SendProducer
I have tried setting below configs:
BigQuery request timeout: 3minutes
akka.kafka.producer {
discovery-method = akka.discovery
topic = "mytopic"
service-name = ""
resolve-timeout = 3 seconds
parallelism = 10000
close-timeout = 60s
close-on-producer-stop = true
use-dispatcher = "akka.kafka.default-dispatcher"
eos-commit-interval = 100ms
kafka-clients {
//my kafka-client configs
}
}
akka.http{
idle-timeout = 180s
}
akka.http.host-connection-pool.max-open-requests = 256
akka {
http {
client {
# Timeout for the entire request
request-timeout = 180s
# Timeout for the idle connection
idle-timeout = 180s
# Timeout for connecting to the server
connecting-timeout = 180s
}
}
}
I have tried setting the idle-timeout for bigquery explicitely, but that doesn’t cause this issue, rather throws a bigquery idle-timeout. None of the above configs seem to fix this issue.
I am new to akka-http, would really appreciate any help/suggestions on this.