I’m implementing a functionality to read messages from a queue in SQS in AWS, process them and insert them into a table in dynamoDB.
I’m using quarkus and reactive programming with Mutiny to run all the operations I need before to insert the message into the table.
The issue I have is that the message is coming encoded, when I try to decode the message and the message is malformed I’m throwing a NonRetryableException.class
when that happens the entire section of code is stopped. For example, if I’m reading 10 messages from the queue and the message is malformed in the messageList number 5 the other 5 missing messages are not going to be processed because the function is stopped.
I tried to isolate the method which decode the messages but it didn’t work.
This is how my method is builded.
private Uni<List<MyTableClassDynamoDB>> processMessagesFromQueue(List<Message> messages) {
if (messages.isEmpty()) {
return Uni.createFrom().nullItem();
}
Log.infof("Number of messages to start to process: %s", messages.size());
Multi.createFrom().iterable(messages)
.onItem().transformToUniAndMerge(mapper::deserializeMessage) //HERE IS THE ERROR
.collect().asList()
.flatMap(service::filterDuplicates)
.map(mapper::transformDataToBeInsertedInDDB)
.flatMap(service::filterExistingIds)
.invoke(service::saveDataInBatch)
.onFailure(NonRetryableException.class).invoke(throwable -> Log.errorf("Error processing the messages %s: ", throwable.getMessage()))
.subscribe()
.with(item -> Log.info("The messages have been processed successfully"));
queueService.deleteMessagesBatch(queueUrl, messages).subscribe().with(
item -> Log.info("The messaages have been processed and deleted successfully"),
throwable -> Log.error("Error deleting the messages: ")
);
deserializeMessage: This method returns a Uni if
is processed. If there is an error it returns a NonRetryableExceptionfilterDuplicates: validate duplicates in dynamoDb table and returns a
Uni<List>transformDataToBeInsertedInDDB: map the Uni<List
to a list that can be inserted in DDB ListfilterExistingIds: Validate duplicates in the list to be inserted and
remove them from the list.saveDataInBatch: Save the messages that has been processed into
dynamoDB table. It’s a void method.
How can I handle the NonRetryableException to avoid to stop the iteration of the messages that exists in the list of messages to be processed?