Question: I’m working with gRPC in Dart, specifically implementing server-side streaming. My goal is to modify the stream by adding events from another file using a reference to the StreamController. However, I’m having trouble getting this to work, and I’m not sure how to properly update the stream with new events from an external source.
Here’s my current setup:
helloword.proto
syntax = "proto3";
service WelcomeProto {
rpc ServerSideList(HelloRequest) returns (stream HelloResponse);
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
WelcomeProtoService.dart
class WelcomeProtoService extends WelcomeProtoServiceBase {
StreamController<HelloResponse> controller = StreamController<HelloResponse>();
// Server-side streaming RPC
@override
Stream<HelloResponse> serverSideList(ServiceCall call, HelloRequest request) {
int counter = 1;
print("Request received: ${request.name}");
Timer.periodic(Duration(seconds: 1), (timer) {
if (counter > 3) {
timer.cancel();
} else {
controller.add(HelloResponse()..message = 'Hello, ${request.name} $counter');
print("controller type: ${controller.runtimeType}");
counter++;
}
});
// Handling stream pause and cancellation
controller.onPause = () => print("Stream paused");
controller.onCancel = () {
print("Stream canceled");
controller = StreamController<HelloResponse>();
};
return controller.stream;
}
void modifyResponse(HelloResponse response) {
print("Adding data to stream...");
print("Is controller closed? ${controller.isClosed}");
print("Has controller listeners? ${controller.hasListener}");
controller.add(response);
}
void closeStream() {
controller.close();
}
}
makecall.dart
This file is used to call modifyResponse()
from another file to add events to the stream.
void main(List<String> arguments) {
final inputService = WelcomeProtoService();
if (arguments.isEmpty) {
print('Please provide input');
return;
}
if (arguments[0] == 'close') {
print('Closing the server');
inputService.closeStream();
return;
}
inputService.modifyResponse(HelloResponse()..message = arguments[0]);
}
Problem:
I’m trying to add new events to the StreamController
from the makecall.dart
file using the modifyResponse
method, but it doesn’t seem to work as expected. Although I can see that modifyResponse()
is being called, the new messages aren’t added to the ongoing stream.
Steps to Reproduce:
-
Run the gRPC server and client.
-
Add a new message event via
makecall.dart
:
dart run ./lib/makecall.dart "New message"
How can I correctly add new events to a server-side streaming gRPC response using a StreamController from an external source? Is there a better way to handle this, or do I need to restructure the stream or controller setup?