i have an Angular Front end and spring boot backend. I have to stream some messages from backedn to the front. I am using webflux to send messages to the front end and in the UI i am using SSE to read the events. It works fine. Below is my streaming end point
This works
@Getmapping(value ="/get/events")
public Flux<MyClass> getStreamingEvents(){
Flux<MyClass> myflux = sinkConfig.getMysink().asFlux().cache();
return myflux;
}
This does not work. This has stopped sending any messages to the UI
@Getmapping(value ="/get/events")
public Flux<MyClass> getStreamingEvents(){
Flux<MyClass> myflux = sinkConfig.getMysink().asFlux().doOnNext(message -> {
log.info("Message recived");
resetTimer()
}.doOnCancel(()->{
log.info("Stream has been canceled")}
.doFinally(signalType ->{
if(signalType == SignalType.CANCEL{
log.info("Cancelled")
}
})
.cache();
return myflux;
}
private void resettimer(){
//code to reset timer. if timer reaches then cancel;
cancelFlux();
}
private void cancelFlux(){
Disposable d = sinkConfig.getMysink().asFlux();
d.dispose();
}
I have two issues
-
once the connection is established from the browser, i see the messages flowing. But after some time, even if no messages are being sent, the cpu utilization goes crazy and the whole server hangs.
-
I am thinking that this is due to the fact the connection is still open. How do i make sure to close the sink if no messages are coming for like 5 mins. I can cancel the flux and dispose the connection. But my second code is not working.