Description: A Json entity is uploaded using the JavaScript WebSocket API. The WebSocketHandler persists the received entity using ReactiveMongoRepository<Profile, String>.
Flux is transformed to Flux containing the persisted entity as Json text, and then to Flux
Requirement: To share the created entity to all browser connections. I have tried several configurations to create a unified flow consisting of the inbound and outbound streams. For all flows, only the Connection that sends an entity receives the update. Please find two examples explained.
Platform: SpringBoot 2.5, spring-boot-starter-data-mongodb-reactive, spring-boot-starter-webflux
// Profile.java
@Document
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Profile {
@Id
private String id;
private String email;
}
//Service.java
public Mono<Profile> create(String email){
return profileRepository.save( new Profile(null, email) ) ;
}
Example 1
//Constructor
@Autowired
public MyWebSocketHandler(ProfileService service, ObjectMapper objMapper) {
this.service = service;
this.mapper = objMapper;
System.out.println("MyHandle: ObjectMapper=" + mapper);
}
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> messageFlux = session.receive() <sup>1</sup>
.map(message -> { <sup>2</sup>
Profile entity = null;
try {
entity=mapper.readValue(message.getPayloadAsText(),Profile.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e.getMessage(), e);
}
return entity;
})
.flatMap(entity -> service.create(entity.getEmail())) <sup>3</sup>
.map (entity -> { <sup>4</sup>
String json = null;
try {
json = mapper.writeValueAsString(entity);
} catch (JsonProcessingException e) {
throw new RuntimeException(e.getMessage(), e);
}
return json;
})
.map(json -> session.textMessage(json)); <sup>5</sup>
return session.send(messageFlux); <sup>6</sup>
} //end handle
- WebSocketMessage#receive returns Flux
- Transform the type to Flux. Returns mapper.readValue(message.getPayloadAsText(),Profile.class)
- Persist the Profile and flatMap from Flux<Mono> to Flux
- Transform to Flux. ObjectMapper.writeValueAsString(Profile) returns a Json String.
- WebSocketSession#textMessage returns a WebSocketMessage containing Json as payload.
- Flux is the source passed to session#send returning Mono
Result:
Two connections are opened from different browsers.The connections are active as indicated by WebSocket#readyState. Only the browser that sends the Profile object (stringified) receives the onMessage callback.
Example2:
Uses SynchronousSink to transform Flux to Fluxconsisting of the persisted Entity converted to Json.
The transformations are the same for steps 1 – 3. At step 4, another Flux is created from Flux and SynchronousSink
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> messageFlux = session.receive() <sup>1</sup>
.map(message -> { <sup>2</sup>
Profile entity = null;
try {
entity=mapper.readValue(message.getPayloadAsText(),Profile.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e.getMessage(), e);
}
return entity;
})
.flatMap(entity -> service.create(entity.getEmail())) ; <sup>3</sup>
Flux<WebSocketMessage> fluxMessage = profileFlux.<String>handle( <sup>4</sup>
(profile, sink) -> {
String json = null;
try {
json = mapper.writeValueAsString(profile);
sink.next(json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e.getMessage(), e);
}
})
.map(str -> session.textMessage(str)).share(); <sup>5</sup>
return session.send(fluxMessage); <sup>6</sup>
} //end handle
- Flux#handle receives the Profile item from the Flux stream and SynchronousSink. Profile is converted to a Json String. Reactor creates a Flux containing a stream composed from onNext(T item) called on an underlying subscriber.
- Flux is transformed to Flux using WebSocketSession#textMessage.Flux#share is invoked on Flux.
- The payload is written and Mono returned.
Results: The same as Example 1.
Note: Example 2 is a revised version of this posted solution: Spring 5 Reactive WebSockets
Note: Spring Reference Documentation states that a flow consisting of inbound/outbound streams can be created by zipping the Fluxes. I am not sure if separating the flows will help. But I cannot figure out how to make both the input and output Fluxes of type WebSocketMessage. Since the input Flux terminates as type Profile in order to create the payload.