I am seeing pretty poor performance with some code which moves data from one datastore to a SQL database.
Currently I manage to process about 200 messages / second.
I initially wrote this utility in Python. I then re-wrote it in Rust. Initially, I thought that part of the reason for the poor performance was due to Python. (The code does a bunch of other potentially expensive calculations which might have been significantly faster in a compiled language such as Rust, rather than Python which has overhead when working with Python objects. It turns out this was not the bottleneck.)
It is a bit tricky to give a useful MWE for this. Let me present something, then the nuances can be discussed later.
use sqlx;
use chrono;
use futures;
fn main() -> Result<(), ()> {
println!("connecting to Postgres...");
let pool =
futures::executor::block_on(
sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgresql://postgres:[email protected]/postgres")
).expect("failed to connect to Postgres");
println!("connected to Postgres");
let html_data = "<div>some long string read from file, ideally a few kb in size</div>";
let datetime_start = chrono::offset::Utc::now();
// insert html_data row
let query_result =
sqlx::query(
"insert into html_data (html_data) values ($1)"
)
.bind(html_data)
.fetch_optional(&pool);
let _optional_row = futures::executor::block_on(query_result).expect("block_on failed");
let datetime_stop = chrono::offset::Utc::now();
let duration = datetime_stop - datetime_start;
let duration = 1.0e+3 * duration.num_seconds() as f64 + 1.0e-9 * duration.subsec_nanos() as f64;
println!("insert html_data: {duration} ms");
Ok(())
}
What this code does is simple:
- It establishes a connection to a Postgres database
- It inserts a row into a table
- The table has two columns, the primary key id column and
html_data
, which is of typevarchar
- The database operations are all synchronous.
futures::executor::block_on
is used to convert an asynchronous interface into a synchronous one
The round-trip time appears to be in the region of 10 ms, which explains the message processing rate of 100 messages / sec.
Fortunatly, many of the database calls could be made asynchronous. Sometimes I have a branch statement which depends on some data returned by the database, but in many cases I just insert data as this example shows.
To convert this code into something which uses asynchrnous io, my current understanding is that I can do the following:
- Instead of blocking on the completition of the future returned by
sqlx::query ... .fetch_optional()
, I could do something else with that future and block on it later.
The problem is, I don’t know what I should do with it.
The type of query_result
is
impl Future<Output = Result<<Postgres as Database>::QueryResult, Error>>
Perhaps I misunderstand, but I don’t think creating query_result
causes it do to anything.
If I understand correctly, calling executor::block_on
on query_result
will
- start the function running
- this will send a network request to the database
- it will then wait for the database to respond
- finally, the future will have been completed and any return values are returned
executor::block_on
finishes
What I actually want is something more like this:
- start the function running
- send network request to database
- do not wait for a response, return some kind of future which I can block on at a later time
However I am not sure if such a thing is possible.
Basically, my idea is it would be nice to have something like a “fire and forget” request. (Except rather than totally forgetting about it, we can check it completed later.)