I am using Rocket + Capnproto where I receive a request from a Web API(Rocket) and have to send it to another server via RPC (Capnproto)
here my main Rocket setup
#[rocket::main]
async fn main() {
rocket::build()
.mount("/API",routes![invoke])
.launch()
.await.ok();
}
Here my route method
#[post("/API", data = "<request>")]
async fn invoke(request: api_request::APIRequest<'_>)Result<Json<api_response::ApiResponse>, Json<api_response::ApiResponseError>>{
let result = capnp_rpc::client::run_client(String::from("Hello World")).await;
}
And here my Capnproto code, this is pretty much the hello world example they have, this works fine separately but once I put it on my Rocket projects fails. As you can see all is wrap in Tokio library.
pub async fn run_client( message: String ) ->Result<String, Box<dyn std::error::Error>> {
let server_addr : String = "127.0.0.1:4000".to_string();
let addr = server_addr
.to_socket_addrs().unwrap()
.next()
.expect("could not parse address");
rocket::tokio::task::LocalSet::new()
.run_until( async move {
let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
stream.set_nodelay(true).unwrap();
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let hello_world: hello_world::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
rocket::tokio::task::spawn_local(rpc_system);
let mut request = hello_world.say_hello_request();
request.get().init_request().set_name(&message[..]);
let reply = request.send().promise.await?;
let reply_message = reply.get()?.get_reply()?.get_message()?.to_str()?;
println!("received: {}", reply_message);
Ok(reply_message.to_string())
}).await
}
here the error I am getting
future cannot be sent between threads safely
within `{async block@src/main.rs:91:1: 91:42}`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `{async block@src/main.rs:91:1: 91:42}: std::marker::Send`
required for the cast from `Pin<Box<{async block@src/main.rs:91:1: 91:42}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>
From what I was able to investigate, Rocket has its own threat-worker and seem that this code is trying to start another threat-worker. I have tried few things from another questions trying to use Mutex, but could not make it work.
Not sure how to wrap this code so it can work under Rocket main threat-worker.