I have a scenario where I have an Http Verticle that in turn sends messages via the eventbus to a worker verticle.
On the Worker Verticle I have a setPeriodic that runs a fairly long job every hour. When this setPeriodic job is running the eventbus on the Worker Verticle side does not receive any messages from the Http Verticle side.
I can see that when the periodic executes the job, it starts on a worker thread. Im a little confused by why the event bus seems to block in this scenario and eventually times out after 30s.
Is this expected?
- There is 1 instance of the worker verticle.
See code example and logs from a test run, I just generated a basic application using vertx.io started and added some code, iv provided the gradle config as well.
package com.example.starter
import io.vertx.core.DeploymentOptions
import io.vertx.core.ThreadingModel
import io.vertx.core.Vertx
import io.vertx.core.http.HttpServer
import io.vertx.core.json.Json
import io.vertx.ext.web.Router
import io.vertx.kotlin.coroutines.CoroutineRouterSupport
import io.vertx.kotlin.coroutines.CoroutineVerticle
import io.vertx.kotlin.coroutines.awaitResult
import org.apache.logging.log4j.kotlin.logger
import java.time.Instant
import kotlin.random.Random
class MainVerticle : CoroutineVerticle(), CoroutineRouterSupport {
private val port = 8888
private val random = Random(System.currentTimeMillis())
private val logger = logger(this::class.simpleName!!)
override suspend fun stop() {
logger.info("${this::class.java.simpleName} stopped on $port")
super.stop()
}
@Throws(Exception::class)
override suspend fun start() {
awaitResult<HttpServer> {
vertx.createHttpServer()
.requestHandler(routes())
.listen(port, it)
logger.info("${this::class.java.simpleName} listening on $port")
}
}
private fun routes(): Router {
val router = Router.router(vertx)
router.post("/run").coHandler { ctx ->
logger.info("Receive request on ${ctx.request().path()}, currentTime: ${Instant.now()}")
vertx.eventBus().request<RunEvent>("run-event", Json.encode(RunEvent(id = random.nextInt()))) {
if (it.succeeded()) {
logger.info("Eventbus response received, responding to client with ${it.result().body()}")
ctx.end("Result: ${it.result().body()}")
} else {
ctx.fail(it.cause())
}
}
}
return router
}
}
class WorkerVerticle : CoroutineVerticle() {
private val sleep = 5_000L
private val logger = logger(this::class.simpleName!!)
override suspend fun start() {
super.start()
vertx.eventBus().consumer<RunEvent>("run-event") { message ->
try {
logger.info("Received on eventbus consumer: ${message.body()}, ${Instant.now()}")
message.reply(Json.encode(RunEventResult(true, Instant.now())))
} catch (e: Exception) {
logger.info("Failed to handle run event: ${Instant.now()} - ${e.message}")
message.fail(400, Json.encode(RunEventResult(false, Instant.now())))
}
}
vertx.setPeriodic(10_000, 60_000) {
logger.info("Before Periodic, will sleep for $sleep ms, now: ${Instant.now()}")
Thread.sleep(sleep)
logger.info("After Periodic, now: ${Instant.now()}")
}
}
}
data class RunEvent(val id: Int, val runEventCreatedAt: Instant = Instant.now())
data class RunEventResult(val success: Boolean, val runEventResultCreatedAt: Instant = Instant.now())
fun main() {
val vertx = Vertx.vertx()
vertx.deployVerticle(
MainVerticle(), DeploymentOptions()
.setThreadingModel(ThreadingModel.EVENT_LOOP)
.setInstances(1)
)
vertx.deployVerticle(
WorkerVerticle(), DeploymentOptions()
.setThreadingModel(ThreadingModel.WORKER)
.setInstances(1)
)
}
Logs are from doing:
- Hit http endpoint and receive response – returns immediately
- Allow periodic to fire and then hit http endpoint and wait for response – this only returned once periodic had completed.
- Hit http endpoint again – returns immediately
INFO WorkerVerticle - WorkerVerticle started [# 14:34:52.369] [vert.x-worker-thread-0]
INFO MainVerticle - MainVerticle listening on 8888 [# 14:34:52.469] [vert.x-eventloop-thread-0]
INFO MainVerticle - Receive request on /run, currentTime: 2024-09-16T12:34:54.548702Z [# 14:34:54.550] [vert.x-eventloop-thread-0]
INFO WorkerVerticle - Received on eventbus consumer: {"id":-412384914,"runEventCreatedAt":"2024-09-16T12:34:54.550736Z"}, 2024-09-16T12:34:54.608534Z [# 14:34:54.608] [vert.x-worker-thread-1]
INFO MainVerticle - Eventbus response received, responding to client with {"success":true,"runEventResultCreatedAt":"2024-09-16T12:34:54.609044Z"} [# 14:34:54.612] [vert.x-eventloop-thread-0]
INFO WorkerVerticle - Before Periodic, will sleep for 5000 ms, now: 2024-09-16T12:35:02.427843Z [# 14:35:02.431] [vert.x-worker-thread-2]
INFO MainVerticle - Receive request on /run, currentTime: 2024-09-16T12:35:03.234533Z [# 14:35:03.234] [vert.x-eventloop-thread-0]
INFO WorkerVerticle - After Periodic, now: 2024-09-16T12:35:07.432642Z [# 14:35:07.433] [vert.x-worker-thread-2]
INFO WorkerVerticle - Received on eventbus consumer: {"id":-218888931,"runEventCreatedAt":"2024-09-16T12:35:03.234836Z"}, 2024-09-16T12:35:07.434098Z [# 14:35:07.434] [vert.x-worker-thread-2]
INFO MainVerticle - Eventbus response received, responding to client with {"success":true,"runEventResultCreatedAt":"2024-09-16T12:35:07.434333Z"} [# 14:35:07.435] [vert.x-eventloop-thread-0]
INFO MainVerticle - Receive request on /run, currentTime: 2024-09-16T12:35:09.920201Z [# 14:35:09.920] [vert.x-eventloop-thread-0]
INFO WorkerVerticle - Received on eventbus consumer: {"id":1885275369,"runEventCreatedAt":"2024-09-16T12:35:09.920523Z"}, 2024-09-16T12:35:09.921379Z [# 14:35:09.921] [vert.x-worker-thread-3]
INFO MainVerticle - Eventbus response received, responding to client with {"success":true,"runEventResultCreatedAt":"2024-09-16T12:35:09.921809Z"} [# 14:35:09.922] [vert.x-eventloop-thread-0]
gradle.kts
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
import org.gradle.api.tasks.testing.logging.TestLogEvent.*
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
kotlin ("jvm") version "1.7.21"
application
id("com.github.johnrengelman.shadow") version "7.1.2"
}
group = "com.example"
version = "1.0.0-SNAPSHOT"
repositories {
mavenCentral()
}
val vertxVersion = "4.5.10"
val junitJupiterVersion = "5.9.1"
val mainVerticleName = "com.example.starter.MainVerticle"
val launcherClassName = "io.vertx.core.Launcher"
val watchForChange = "src/**/*"
val doOnChange = "${projectDir}/gradlew classes"
application {
mainClass.set(launcherClassName)
}
dependencies {
implementation(platform("io.vertx:vertx-stack-depchain:$vertxVersion"))
implementation("io.vertx:vertx-web")
implementation("io.vertx:vertx-lang-kotlin")
implementation("io.vertx:vertx-lang-kotlin-coroutines")
implementation("com.fasterxml.jackson.core:jackson-databind:2.17.2")
implementation(kotlin("stdlib-jdk8"))
testImplementation("io.vertx:vertx-junit5")
testImplementation("org.junit.jupiter:junit-jupiter:$junitJupiterVersion")
implementation("org.apache.logging.log4j:log4j-slf4j2-impl:2.23.1")
implementation("org.apache.logging.log4j:log4j-api:2.23.1")
implementation("org.apache.logging.log4j:log4j-core:2.23.1")
implementation("org.apache.logging.log4j:log4j-api-kotlin:1.4.0")
}
val compileKotlin: KotlinCompile by tasks
compileKotlin.kotlinOptions.jvmTarget = "17"
tasks.withType<ShadowJar> {
archiveClassifier.set("fat")
manifest {
attributes(mapOf("Main-Verticle" to mainVerticleName))
}
mergeServiceFiles()
}
tasks.withType<Test> {
useJUnitPlatform()
testLogging {
events = setOf(PASSED, SKIPPED, FAILED)
}
}
tasks.withType<JavaExec> {
args = listOf("run", mainVerticleName, "--redeploy=$watchForChange", "--launcher-class=$launcherClassName", "--on-redeploy=$doOnChange")
}
log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Properties>
<Property name="LOG_PATTERN">%-5level %logger{36} - %msg [# %d{HH:mm:ss.SSS}] [%t]%n</Property>
<Property name="LOG_PATTERN_SIMPLE">%msg%n</Property>
</Properties>
<Appenders>
<Console name="ConsoleLogSimple" target="SYSTEM_OUT">
<PatternLayout pattern="${LOG_PATTERN_SIMPLE}"/>
</Console>
<Console name="ConsoleError" target="SYSTEM_ERR">
<PatternLayout pattern="${LOG_PATTERN}"/>
</Console>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="${LOG_PATTERN}"/>
<Filters>
<ThresholdFilter level="warn" onMatch="DENY" onMismatch="NEUTRAL"/>
</Filters>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef level="warn" ref="ConsoleError"/>
</Root>
</Loggers>
</Configuration>
Would appreciate any insights!
2
“There is 1 instance of the worker verticle.”
=> Yes, default is 1 instance. You can increase instance numbers by options
new DeploymentOptions().setInstances(instances) // set 2,3,4 v.v...
About message is sent by event-bus. View my sample to get impl for all events. No need to use Json.encode/decode with event-bus
EventMessageCodec
Sub events extend from class AppEvent
Use with event-bus:
eventBus().request("ids", new YourEvent(), AppEvent.options)
eventBus().<YourEvent>consumer("ids", msg -> {...})
It is simple to use. No more serialize/deserialize manually.
More details in my microservices sample with vertx
4
From the vertx.io documentation:
“Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can executed by different threads at different times