I have a server-side streaming RPC server and the stream returned by the function is meant to “drop” intermediate updates (in the sense of not returning the from .next()
) based on how fast .next()
is called, but it seems like the server is calling .next()
in a tight loop regardless of how fast or slow the client is able to consume from it. How can I apply backpressure from the client-side so that .next()
calls are limited by the rate at which the client can consume?
I tried setting the buffer size on the client side, but that doesn’t do anything (I can’t find anywhere to set it on the server side).
Sorry for the large MWE. tonic
code isn’t particularly concise.
Here’s an example server and client where the server increments a counter and attempts to publish its value to any subscribers once-per-second, but the client only consumes messages once every two seconds. When I run this, the client seems to fall indefinitely behind. In other words, the queue seems completely unbounded.
I see this same behavior when the client is fast to consume but I use toxiproxy to throttle the connection, so this isn’t being caused by an unbounded client-side cache.
// streaming.proto
syntax = "proto3";
package streaming;
service Counter {
rpc StreamCounter(Empty) returns (stream CounterResponse);
}
message Empty {}
message CounterResponse {
int32 value = 1;
}
// server.rs
use futures_util::stream;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::{watch, Mutex};
use tokio::time::{sleep, Duration};
use tokio_stream::{Stream, StreamExt};
use tonic::async_trait;
use tonic::{transport::Server, Request, Response, Status};
pub mod streaming {
tonic::include_proto!("streaming");
}
use streaming::counter_server::{Counter, CounterServer};
use streaming::{CounterResponse, Empty};
pub struct MyCounter {
tx: Arc<watch::Sender<i32>>,
}
#[async_trait]
impl Counter for MyCounter {
type StreamCounterStream = Pin<Box<dyn Stream<Item = Result<CounterResponse, Status>> + Send>>;
async fn stream_counter(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::StreamCounterStream>, Status> {
let rx = Arc::new(Mutex::new(self.tx.subscribe()));
let output = stream::repeat(())
.then(move |()| {
let rx = rx.clone();
async move {
let mut rx = rx.lock().await;
let result = rx.changed().await;
match result {
Ok(()) => {
let value = *rx.borrow();
drop(rx);
println!("Sending: {}", value);
Some(Ok(CounterResponse { value }))
}
Err(_) => None,
}
}
})
.map_while(|x| x);
Ok(Response::new(Box::pin(output)))
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, _rx) = watch::channel(0);
let tx = Arc::new(tx);
let my_counter = MyCounter { tx: tx.clone() };
tokio::spawn(async move {
let mut counter = 0;
loop {
sleep(Duration::from_secs(1)).await;
counter += 1;
tx.send(counter).unwrap();
}
});
let addr = "[::1]:50051".parse()?;
Server::builder()
.add_service(CounterServer::new(my_counter))
.serve(addr)
.await?;
Ok(())
}
// client.rs
use streaming::counter_client::CounterClient;
use streaming::Empty;
use tokio_stream::StreamExt;
use tonic::transport::Endpoint;
use tonic::Request;
pub mod streaming {
tonic::include_proto!("streaming");
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client =
CounterClient::connect(Endpoint::from_static("http://[::1]:50051").buffer_size(Some(2)))
.await?;
let request = Request::new(Empty {});
let mut stream = client.stream_counter(request).await?.into_inner();
while let Some(response) = stream.next().await {
match response {
Ok(counter_response) => {
println!("Received: {}", counter_response.value);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; // Simulate slow processing
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
Ok(())
}