I am busy converting WCF service to gRPC. One of the methods is to upload large file(s) via streaming. I am currenlty using .NET 6.0 for Server & Client
I followed the example that is provided from here
I keep on getting the following error,
System.IO.IOException: The request stream was aborted. ---> Microsoft.AspNetCore.Connections.ConnectionAbortedException: The HTTP/2 connection faulted.
When I run the server (docker container) and client (WPF) locally, ie my PC, it works. But when I move the server to my test kubernetes cluster on a pod, I get the error as descibered above with a stacktrace.
System.IO.IOException: The request stream was aborted. ---> Microsoft.AspNetCore.Connections.ConnectionAbortedException: The HTTP/2 connection faulted. --- End of inner exception stack trace --- at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at System.IO.Pipelines.Pipe.DefaultPipeReader.GetResult(Int16 token) at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder
1.StateMachineBox1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadStreamMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func
2 deserializer, CancellationToken cancellationToken)
at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder1.StateMachineBox
1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token)
at Grpc.AspNetCore.Server.Internal.HttpContextStreamReader1.<MoveNext>g__MoveNextAsync|13_0(ValueTask
1 readStreamTask)
at Grpc.Core.AsyncStreamReaderExtensions.ReadAllAsyncCore[T](IAsyncStreamReader1 streamReader, CancellationToken cancellationToken)+MoveNext() at Grpc.Core.AsyncStreamReaderExtensions.ReadAllAsyncCore[T](IAsyncStreamReader
1 streamReader, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
at KEY360.FacadeCore.Services.Key360Stream.UploadFile(IAsyncStreamReader1 requestStream, ServerCallContext context) in KeyStream.cs:line 339 at KEY360.FacadeCore.Services.Key360Stream.UploadFile(IAsyncStreamReader
1 requestStream, ServerCallContext context) in KeyStream.cs:line 339
at KEY360.FacadeCore.Services.Key360Stream.UploadFile(IAsyncStreamReader1 requestStream, ServerCallContext context) in KeyStream.cs:line 404 at Grpc.Shared.Server.ClientStreamingServerMethodInvoker
3.ResolvedInterceptorInvoker(IAsyncStreamReader1 requestStream, ServerCallContext resolvedContext) at Grpc.Shared.Server.ClientStreamingServerMethodInvoker
3.ResolvedInterceptorInvoker(IAsyncStreamReader1 requestStream, ServerCallContext resolvedContext)
Here is my client code & server code.
Client
// some other code here, but it just get the client and opens the file into a filestream
// and then calls the client UploadFileAsync
try
{
// Ensure the stream is positioned at the beginning
//
if (stream.CanSeek)
{
stream.Seek(0, SeekOrigin.Begin);
}
var streamInfo = new StreamInfo
{
AllowClientCaching = false,
Exception = null,
Length = stream.Length,
Message = GetRequestStringJSon(transaction),
NewDocumentID = newDocumentId,
FileName = Path.GetFileNameWithoutExtension(fileForUpload),
Success = true,
ServerCachingGUID = Guid.Empty.ToString(),
};
response = await service.UploadFileAsync(stream, streamInfo, JsonMetaData, ct);
}
catch (OperationCanceledException)
{
// Handle cancellation
Console.WriteLine("Upload canceled.");
}
catch (IOException ioEx)
{
// Handle I/O exceptions specifically
Console.WriteLine($"I/O Exception: {ioEx.Message}");
}
catch (Exception ex)
{
// Handle all other exceptions
Console.WriteLine($"Exception: {ex.Message}");
}
When UploadFileAsync it called, the catch is triggered, on the (Exception ex) catch line.
public async Task<UploadResponse> UploadFileAsync(Stream data, StreamInfo info , Metadata metadata, CancellationToken? ct = null)
{
using (var call = _client.UploadFile(metadata))
{
int bufferSize = 1024 * 32; //32kb
var buffer = new byte[bufferSize];
int size = 0;
int total;
bool fileRead = false;
var chk = 1;
int chunkTotal = int.Parse((data.Length / bufferSize).ToString());
if (data.Length % bufferSize > 0)
chunkTotal++;
while (true)
{
total = await data.ReadAsync(buffer);
if (total == 0) break;
fileRead = true;
await call.RequestStream.WriteAsync(new PutRequest
{
Data = Google.Protobuf.ByteString.CopyFrom(buffer[0..total]),
Offset = size,
Info = new Protos.StreamInfo
{
AllowClientCaching = info.AllowClientCaching,
Exception = null,
FileName = info.FileName,
Length = info.Length,
Message = info.Message,
NewDocumentID = info.NewDocumentID,
ServerCachingGUID = info.ServerCachingGUID,
Success = info.Success
}
}, ct.GetValueOrDefault());
if (ct != null && ct.Value.IsCancellationRequested)
{
break;
ct.Value.ThrowIfCancellationRequested();
}
size += total;
chk++;
}
if (!fileRead)
{
await call.RequestStream.WriteAsync(new PutRequest
{
Offset = 0,
Info = new Protos.StreamInfo
{
AllowClientCaching = info.AllowClientCaching,
Exception = null,
FileName = info.FileName,
Length = info.Length,
Message = info.Message,
NewDocumentID = info.NewDocumentID,
ServerCachingGUID = info.ServerCachingGUID,
Success = info.Success
}
});
}
await call.RequestStream.CompleteAsync();
var done = await call.ResponseAsync;
if (done.Success) // completed
{
return new UploadResponse { Success = true, DocumentID = done.DocumentID, Exception = null };
}
else
return new UploadResponse { Success = false, DocumentID = done.DocumentID, Exception = done.Exception };
}
}
The server code looks like this:
public override async Task<UploadResponse> UploadFile(IAsyncStreamReader<PutRequest> requestStream, ServerCallContext context)
{
_serviceContract.Context = context;
try
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(10));
var token = cts.Token;
if (context?.CancellationToken != null)
token = _serviceContract.Context.CancellationToken;
Facade.UploadStreamInfo uploadStream = null;
var fileName = Path.GetTempFileName() + ".upload";
await using var file = File.Create(fileName);//, FileMode.Create);
// THE ERROR HAPPENS HERE, ReadAllAsync, according to the stacktrace.
await foreach(var data in requestStream.ReadAllAsync(cancellationToken:token))
{
try
{
if (data.Data.Length > 0)
{
//file.Position = file.Length;
await file.WriteAsync(data.Data.ToByteArray());//, 0, bufferSize);
}
} catch (System.Exception ex) // this is not triggered, just some debug code.
{
await file.FlushAsync(token);
file.Close();
File.Delete(fileName);
throw new RpcException(Status.DefaultCancelled, $"WriteError on Server with error. {ex.Message}");
}
if (uploadStream == null && requestStream.Current != null)
{
uploadStream = new Facade.UploadStreamInfo
{
AllowClientCaching = requestStream.Current.Info.AllowClientCaching,
ByteStream = null,
FileName = requestStream.Current.Info.FileName,
NewDocumentID = requestStream.Current.Info.NewDocumentID,
Message = requestStream.Current.Info.Message,
Length = requestStream.Current.Info.Length,
ServerCachingGUID = requestStream.Current.Info.ServerCachingGUID,
Success = requestStream.Current.Info.Success
};
}
}
await file.FlushAsync(token);
file.Close();
// some code removed here....
// this code just move the uploaded file to the correct location, but it never gets here.
} catch (System.Exception ex)
{
//throw;
return new UploadResponse { DocumentID = -1, Success = false, Exception = new Protos.Exception { Exceptiontree = ex.InnerException?.Message, Message = ex.Message, Type = ExceptionType.Error, Stacktrace = ex.StackTrace } };
}
return new UploadResponse { DocumentID = -1, Success = false, Exception = new Protos.Exception { Message = "UploadFile failed due to unknown reason?", Type = ExceptionType.Validation } };
}
SSG_CM is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.