I am using axum server for handling websocket connections. I am reading messages from kafka topic and if the websocket connection is established to receive messages from a particular session then the message will be relayed on to that connection.
I am facing issues when I am load testing the application for 5000 connections (4CPU and 8GB RAM). I am getting i/o timeout error on client side.
I am benchmarking this service against Java spring boot but java service is working fine with very low latency and the configuration is same.
I am sharing the important code snippets, though I am using the standard code from example
lazy_static! {
pub static ref ACTIVE_SESSIONS: DashMap<String, SplitSink<WebSocket, axum::extract::ws::Message>> =
DashMap::new();
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
let consumer_handles = (0..num_partitions)
.map(|_| {
tokio::spawn(run_async_processor(
broker_url.to_owned(),
random_group_id.to_owned(),
topic.to_owned(),
username.to_owned(),
password.to_owned(),
app_config.kafka.consumer.enable_auto_commit.to_owned(),
offset.to_owned(),
))
})
.collect::<Vec<_>>();
match TcpListener::bind(&ADDR).await {
Ok(listener) => {
let local_addr = listener.local_addr().expect("Failed to get local address");
tracing::info!("server is running on port {}", local_addr.port());
axum::serve(listener, app.into_make_service())
.tcp_nodelay(true)
.await
.expect("Server failed to start");
}
Err(e) => {
tracing::error!("Failed to initiate the listener: {}", e);
std::process::exit(1);
}
}
the handler function of websocket is adding entry to ACTIVE_SESSIONS. On the other hand, kafka consumer consumes and check if there is a message for an already established connection and send it to that connection. I am new to rust and axum.
Can I get some help how to make it better?
1