processMessage
method returns a completion stage.
CompletionStage<PubsubMessage> processedMessage = processMessage(message, parsedMessage)
while getPublishers
returns a List of publishers.
protected List<MessagePublisher> getPublishers() {
return stepConfig.getPublishers().values().stream().toList();
}
- All the publishers have to publish the processedMessage
CompletionStage<Stream<CompletionStage<String>>> streamCompletionStage = processedMessage.thenApply(
pubsubMessage -> {
return getPublishers().stream()
.map(messagePublisher -> messagePublisher.publish(pubsubMessage));
});
- Previously getPublisher was just returning a single publisher, and I was able to do the following:
return processMessage(message, parsedMessage)
.thenApply(pubsubMessage -> getPublisher().publish(pubsubMessage))
.thenApply(
result -> {
try {
return Result.ACK;
} catch (Exception e) {
return Result.NACK;
}
});
- I have to return
Return.NACK
even if one of thetry's
fail,Result.ACK
otherwise.
My knowledge of java concurrent is somewhat limited and I am not sure how to go about it, any suggestions/help will be welcome.