I have a bidi stream, server looks like this:
@GrpcService(interceptors = {GrpcLoggingInterceptor.class})
public class GrpcServer extends MyServiceGrpc.MyServiceImplBase {
// this stores the streamObserver against a client id, in a map
// so if the server needs to initiate a message to the client, it knows
// what streamObserver to send it on
@Autowired
private MyConnectionRegistry registry;
@Override
public StreamObserver<ClientToServerMessage> myChannel(
StreamObserver<ServerToClientMessage> responseObserver) {
log.info("New client connected to *GRPC* server");
// New client connected, create a new stream observer for this client.
return new MyStreamObserver(registry, responseObserver);
}
}
The observer is like this
@Slf4j
public class MyStreamObserver implements StreamObserver<ClientToServerMessage> {
private final MyRegistry registry;
private final StreamObserver responseObserver;
public MyStreamObserver(MyRegistry registry, StreamObserver responseObserver) {
this.registry = registry;
this.responseObserver = responseObserver;
}
@Override
public void onNext(ClientToServerMessage clientToServerMessage) {
log.info("Got a message!");
// grab client ID from the message and process it
// add clientId + streamobserver to registry, so if the server
// wants to initiate a message to a client, it knows which stream to use
// send an ack back to the responseObserver
}
@Override
public void onError(Throwable throwable) {
log.error("something bad happened", throwable);
}
@Override
public void onCompleted() {
log.info("all done");
}
}
Client side, I create an async stub (the tooling doesn’t generate a sync stub for streams), and the client loops through 5 messages calling requestObserver.onNext(msg);
expected behaviour:
New client connected to *GRPC* server
Got a message!
Got a message!
Got a message!
Got a message!
Got a message!
all done
Actual behaviour:
New client connected to *GRPC* server
Got a message!
New client connected to *GRPC* server
Got a message!
New client connected to *GRPC* server
Got a message!
New client connected to *GRPC* server
Got a message!
New client connected to *GRPC* server
Got a message!
all done
Question: Why is it calling myChannel()
each time rather than just 1 time when the connection is established? Is the async stub on the client side creating a new connection each time?