I’ve been suffering implementing grpc bidirectional stream.
I’m expecting the server keeps request stream open until there’s an explicit stream dispose… though for some reasons, my server keeps getting an exception{“Can’t read messages after the request is complete.”}. If you see my code below, client is sending 1000 requests but server’s requestStream.ReadAllAsync()
throws an exception after a few iterations which don’t make sense to me.
Thanks for the help!
This is my server side implementation of bidirectional stream
private async Task HandleUpdate(IAsyncStreamReader<UpdateRequest> requestStream,
IServerStreamWriter<UpdateResponse> responseStream, CancellationToken cancellationToken)
{
async Task ProcessRequests()
{
try
{
await foreach (var request in requestStream.ReadAllAsync(cancellationToken))
{
_logger.LogInformation($"Update: {request}");
}
}
catch (Exception e)
{
}
}
async Task PublishResponses()
{
try
{
while (true)
{
// WIll add publish function
await Task.Delay(2000, cancellationToken);
}
}
catch (TaskCanceledException)
{
}
catch (Exception e)
{
}
}
try
{
await Task.WhenAny(ProcessRequests(), PublishResponses());
}
catch (Exception ex)
{
_logger.LogError(ex, "Exception");
}
}
And this is a client
using (var call = client.StreamTableUpdate())
{
try
{
// Writing to the request stream
var writeTask = Task.Run(async () =>
{
for (int i = 0; i < 1000; i++)
{
try
{
var request = new UpdateRequest();
await call.RequestStream.WriteAsync(request);
}
catch (Exception e)
{
// Log the exception here...
}
}
await call.RequestStream.CompleteAsync();
});
await foreach (var response in call.ResponseStream.ReadAllAsync())
{
// To DO
}
await writeTask;
}
catch (Exception e)
{
_logger.LogError(e, "An error occurred while processing the stream.");
}
}