I’m building a Rust multithread app, that works with database items. Therefore, I’ve chosen diesel-async
with deadpool
manager, to efficiently and thread-safe manage DB connections. However, I’ve hit a bump, as apparently one thread blocks another’s attempt to get a DB connection from the pool.
Here’s what I use:
#[tokio::main]
async fn main() {
// Load environment variables from .env
dotenv().expect("Failed to load .env file");
// Initialize DB pool to be reused among all tasks
let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set.");
let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(database_url);
let Ok(pool) = Pool::builder(config).build() else {
panic!("Cannot create a DB pool!");
};
pool.resize(8);
let mut tasks = JoinSet::new();
// create and maintain a WSS connection to web backend to fetch new item requests
tasks.spawn(async move { loop_save_incoming_new_items(pool.clone()).await });
// initialize items that are already tracked
tasks.spawn(async move { loop_initialize_items(pool.clone()).await });
// manage items regularly
tasks.spawn(async move { loop_manage_items_regularly(pool.clone()).await });
if let Some(res) = tasks.join_next().await {
println!("Task finished with result: {:?}", res);
}
}
Now those 3 functions that define threads contain a bunch of other nested functions, in pseudocode:
async fn loop_save_incoming_new_items(db_pool: Pool<AsyncPgConnection>) {
// fetch new items from a source
let item = wss_get_new_item();
// convert it to a format this app expects
let item_converted = convert(&item);
// store to DB
db_store_new_item(&item_converted, &db_pool);
// wait till next new item fetch trial
std::thread::sleep(Duration::from_secs(30));
}
async fn loop_initialize_items(db_pool: Pool<AsyncPgConnection>) {
// get a new item, if there's some
let Some(new_item) = db_get_uninitialized_item(&db_pool) else {
std::thread::sleep(Duration::from_secs(60));
}
// do some mumbo-jumbo on it
let initialized_item = initialize_item(&new_item);
// store it back to DB
db_update_stored_item(&initialized_item, &db_pool);
}
Now functions with db_
prefix all start with a function that fetches a connection from the pool:
async fn get_connection_from_pool(
db_pool: &Pool<AsyncPgConnection>,
) -> Result<Object<AsyncPgConnection>, MyError> {
println!("Getting a new DB connection from pool, status :{:?}...", db_pool.status());
let start = Instant::now();
match db_pool.get().await {
Ok(a) => {
println!("Success! It only took {:?}", start.elapsed());
Ok(a)
}
Err(e) => {
println!("Unable to get DB connection from the pool: {:?}", e);
Err(MyError::DB)
}
}
}
So a function to store new items looks like this:
pub(crate) async fn db_store_new_item(
new_item: &Item,
db_pool: &Pool<AsyncPgConnection>,
) -> Result<(), MyError> {
let mut conn = get_connection_from_pool(db_pool).await?;
match diesel::insert_into(managed_items)
.values(new_item)
.execute(&mut conn)
.await
{
Ok(_) => Ok(()),
Err(e) => {
eprintln!("Failed to insert new item: {:?}", e);
Err(MyError::DB)
}
}
}
And now, the problem: when I run this, the threads apparently collide. The thread that inserts new items to DB, which sleeps 30 seconds every time it inserts a new item, effectively blocks other threads from getting their connections from the pool. Only when that thread goes through inserting a new item, i.e. db_pool
is ‘touched’ in that thread, other threads can get their connections. In terminal, it looks like this:
DB pool status: Status { max_size: 8, size: 3, available: 3, waiting: 0 }
Success! It only took 1.200926ms
Getting a new DB connection from pool, status: Status { max_size: 8, size: 3, available: 3, waiting: 0 }
Getting a new DB connection from pool, status: Status { max_size: 8, size: 3, available: 2, waiting: 0 }
Success! It only took 1.300275ms
Success! It only took 30.721683884s
Any clue what can be the source issue please?
I’ve tried:
- adding
Arc<>
aroundPool<>
, but in vain (also,Pool
has an internalArc
, so that had not much hope of success) - increasing the size of DB pool to 8, but according to
db_pool.status()
output, size and available connections are not an issue - increasing number of CPU/cores in the VM in which I run it (Ubuntu 24, 2 CPUs x 2 cores)
I’m out of my wits now, any help is greatly appreciated. Thanks!
Also, is this way of propagating the pool to the bottom (= direct DB functions), which yield a new connection from the pool and immediately use and discard it, correct/best practice? 🙂