I got a piece of code which is largely synchronous code running in different threads.
communication is mostly being done via std::sync::mpsc::channel, but now i need to have some other code passing messages to elasticsearch, which only seems to provide an async interface.
my problem is, that i cannot seem to establish a channel from sync-code to async-code, even when using tokio::sync::mpsc::channel. the channel on the sender side is always closed.
to reproduce the problem i created a minimal version of how the code is structured:
use tokio::sync::mpsc::{Receiver, Sender};
use std::{thread, time};
#[derive(Clone, Debug)]
pub struct Message {
pub m: i64,
}
pub fn syncfunc(
tx: Sender<Message>
) {
println!("starting sync-func");
loop {
for x in 1..10_000_000 {
let m = Message{m:x};
if tx.is_closed(){
println!("channel is closed.. not sending data.");
}
else {
match tx.blocking_send(m){
Ok(d) => d,
Err(e) => println!("Error sending data: {}", e ),
};
}
thread::sleep(time::Duration::from_millis(1000));
}
}
}
async fn asyncfunc(
mut rx: Receiver<Message>,
) {
println!("starting async-func");
loop {
let rcv = rx.recv().await.unwrap();
println!("got data: {:?}", rcv.m);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// channel
let (tx, rx): (Sender<Message>, Receiver<Message>) =
tokio::sync::mpsc::channel(16384);
// async-part
tokio::task::spawn(async move {
asyncfunc(rx).await;
});
// sync-part
//let out_x = tx.clone();
tokio::task::spawn_blocking(move || {
syncfunc(tx);
});
Ok(())
}
The output of this looks like
starting async-func
starting sync-func
channel is closed.. not sending data.
channel is closed.. not sending data.
channel is closed.. not sending data.
I am fairly new to rust, so i might overlook something obvious, but here is what i tried so far.
- using unbounded_channels
- spawning the async-function in another separate runtime
- using an Arc<Mutex<Receiver> to hold the receiver
- using a struct for the receiver, holding the channel and having the async_receive as a function of it referring to the channel via self.chan.recv().await…
my suspicion is, that somehow the reference to the channel inside the async-function is immediately dropped when calling an ‘await’ inside the async-function, but i can’t seem to figure out how to avoid that.
any help would be greatly appreciated.