The following configuration that returns WebSocket#handle as a lambda expression shares the Entity to all browser connections.
@Bean
public WebSocketHandler webSocketHandler (ObjectMapper objectMapper,
ProfileCreatedEventListener eventListener) {
Flux<ProfileCreatedEvent> flux = Flux.create(eventListener).share();
return session -> {
Flux<WebSocketMessage> messageFlux = flux.map(evt -> {
try {
return objectMapper.writeValueAsString(evt.getSource());
} catch(IOException e) {
throw new RuntimeException(e);
}
})
.map(str -> {
return session.textMessage(str); //returns WebSocketMessage
});
return session.send(messageFlux);
} ; //end WebSocket#handle
} //end @Bean webSocketHandler
When I code exactly the same WebSocketHandler#handle method within a component that implements WebSocketHandler only the browser that connects first receives the Json entity. What is the difference?
@Configuration
@Bean
@Autowired
public HandlerMapping handlerMapping(MyWebSocketHandler wsh) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/profiles", wsh);
int order = 10; // after ProfileEndpointConfiguration
return new SimpleUrlHandlerMapping(map, order);
}
@Component
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<ProfileCreatedEvent> fluxEvent = Flux.create(listener).share();
Flux<WebSocketMessage> messageFlux = fluxEvent.map (
evt -> {
ObjectMapper objectMapper = new ObjectMapper();
String json = null;
try {
json = objectMapper.writeValueAsString(evt.getSource());
} catch(IOException e) {
throw new RuntimeException(e);
}
return json;
})
.map(str -> session.textMessage(str));
return session.send(messageFlux);
} //end handle
}
General Architecture:
A Json entity is uploaded to a RouterFunction. The Router is bound to a handler
that invokes ReactiveCrudRepository#save through a Service component. At Service#create
the inserted entity is published.
//Model
public class Profile {
@Id
private String id;
private String email;
}
public interface ProfileRepository extends ReactiveMongoRepository<Profile, String> {}
//Service Component
public ProfileService(ApplicationEventPublisher publisher, ProfileRepository profileRepository) {. . .}
public Mono<Profile> create(String email){
return profileRepository.save( new Profile(null, email) )
.doOnSuccess( profile -> publisher.publishEvent( new ProfileCreatedEvent( profile) ) );
}
public class ProfileCreatedEvent extends ApplicationEvent {
public ProfileCreatedEvent(Profile source) {
super( source );
}
}
//Handler
public Mono<ServerResponse> create(ServerRequest request){
final Flux<Profile> profileFlux = request.bodyToFlux( Profile.class )
.flatMap( p -> profileService.create( p.getEmail() ) );
return defaultWriteResponse( profileFlux );
}
//Note: The body is not returned in the ServerResponse, so that Profile entity can be published.
private static Mono<ServerResponse> defaultWriteResponse(Publisher<Profile> profiles) {
return Mono
.from(profiles)
.flatMap(p -> ServerResponse
.created( URI.create( "/profiles/" + p.getId()))
.contentType(MediaType.APPLICATION_JSON)
.build()
);
}
onApplicationEvent is coded on a component that
implements ApplicationListener and Consumer<FluxSink>.
The component is used as a Consumer for Flux#create.
Within the WebSocketHandler:
Flux fluxEvent = Flux.create(listener).share();
Flux#create(Consumer <FluxSink>) is used to return a Flux. When the Profile entity is published, Sink#next
passes the Profile entity to React’s Subscriber.
@Component
public class ProfileCreatedEventListener implements ApplicationListener <ProfileCreatedEvent>,
Consumer <FluxSink<ProfileCreatedEvent>> {
@Override
public void onApplicationEvent(ProfileCreatedEvent event) {
this.queue.offer(event);
}
@Override
public void accept(FluxSink<ProfileCreatedEvent> sink) {
this.executor.execute(() -> {
while(true) {
try {
ProfileCreatedEvent event = que.take()
sink.next(event) ;
} catch (InterruptedException e) {}
}//end while
} //end lambda
) ; //end execute
} //end accept
}//end class
Configurations are tested using curl to upload an entity the router. A simple HTML view includes the JavaScript.WebSocket#onMessage defines a callback that displays the Json text.
The code that works was obtained from the tutorial: Build Reactive APIs with Spring WebFlux by Matt Raible
https://developer.okta.com/blog/2018/09/24/reactive-apis-with-spring-webflux