I have a rabbitMQ consumer that extends UntypedConsumerActor implemented as shown below. when it can’t connect to rabbit endpoint, I get a Operation timed out exception and the whole system shuts down. I implemented a supervision strategy to catch this exception and restart the actor but this exception is not catched for some reason and my system shuts down after timeout.
RabbitMq consumer actor:
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import kantox.integration.worldlink.messages.akka.AkkaMessage;
import kantox.integration.worldlink.messages.akka.FXBookRequestMessage;
import kantox.integration.worldlink.messages.akka.FXCancelRequestMessage;
import kantox.integration.worldlink.messages.akka.PaymentInitiationRequestMessage;
import kantox.integration.worldlink.messages.akka.PaymentStatusInquiryRequestMessage;
import kantox.integration.worldlink.messages.akka.PaymentValidationRequestMessage;
import kantox.integration.worldlink.messages.akka.QuoteRequestMessage;
import kantox.integration.worldlink.messages.akka.StreamQuoteRequestMessage;
import java.util.HashMap;
import static kantox.integration.worldlink.utils.WorldlinkLogger.error;
import static kantox.integration.worldlink.utils.WorldlinkLogger.info;
public class RabbitConsumer extends UntypedConsumerActor {
private String rabbitUri;
private ActorRef worldlinkEngine;
private static final String QUOTE_REQUEST = "QuoteRequestMessage";
private static final String STREAM_QUOTE_REQUEST = "StreamQuoteRequestMessage";
private static final String FX_BOOK_REQUEST = "FXBookRequestMessage";
private static final String FX_CANCEL_REQUEST = "FXCancelRequestMessage";
private static final String PAYMENT_VALIDATION = "PaymentValidationRequestMessage";
private static final String PAYMENT_INITIATION = "PaymentInitiationRequestMessage";
private static final String STATUS_INQUIRY = "PaymentStatusInquiryRequestMessage";
static public Props props(ActorRef worldlinkEngine, String rabbitUri) {
return Props.create(RabbitConsumer.class, () -> new RabbitConsumer(worldlinkEngine, rabbitUri));
}
public RabbitConsumer(ActorRef worldlinkEngine, String rabbitUri) {
super();
this.rabbitUri = rabbitUri;
this.worldlinkEngine = worldlinkEngine;
}
@Override
public void onReceive(Object message) throws JsonProcessingException {
if (message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message;
String json = camelMessage.getBodyAs(String.class, getCamelContext());
info("Received message: " + json);
AkkaMessage requestMessage = getMessage(json);
if (requestMessage != null) {
worldlinkEngine.tell(requestMessage, self());
info(requestMessage.getClass().getSimpleName() + " sent to worldlinkEngine");
}
}
}
@Override
public String getEndpointUri() {
return rabbitUri;
}
private AkkaMessage getMessage(String json) throws JsonProcessingException {
AkkaMessage requestMessage = null;
ObjectMapper mapper = new ObjectMapper();
HashMap request = mapper.readValue(json, HashMap.class);
String type = (String) request.get("type");
if (type == null) {
error("Type of message is mandatory");
return null;
}
switch (type) {
case QUOTE_REQUEST:
requestMessage = mapper.readValue(json, QuoteRequestMessage.class);
break;
case STREAM_QUOTE_REQUEST:
requestMessage = mapper.readValue(json, StreamQuoteRequestMessage.class);
break;
case FX_BOOK_REQUEST:
requestMessage = mapper.readValue(json, FXBookRequestMessage.class);
break;
case FX_CANCEL_REQUEST:
requestMessage = mapper.readValue(json, FXCancelRequestMessage.class);
break;
case PAYMENT_VALIDATION:
requestMessage = mapper.readValue(json, PaymentValidationRequestMessage.class);
break;
case PAYMENT_INITIATION:
requestMessage = mapper.readValue(json, PaymentInitiationRequestMessage.class);
break;
case STATUS_INQUIRY:
requestMessage = mapper.readValue(json, PaymentStatusInquiryRequestMessage.class);
break;
default:
error("Type of message is not valid");
}
return requestMessage;
}
}
Supervision strategy implemented in the supervisor actor:
private SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.ofSeconds(30), DeciderBuilder
.match(ConnectException.class, e -> SupervisorStrategy.restart())
.matchAny(e -> {
error(e.getLocalizedMessage());
return SupervisorStrategy.resume();
}).build());
Exception stacktrace:
[ERROR] [06/11/2024 14:20:43.176] [worldlink-akka.actor.default-dispatcher-4] [akka://worldlink/user/camel-supervisor/registry/consumerRegistrar] Actor[akka://worldlink/user/worldlinkEngine/$c#890483991] failed to activate
java.net.ConnectException: Operation timed out (Connection timed out)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:445)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:504)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:545)
at org.apache.camel.component.rabbitmq.RabbitMQEndpoint.connect(RabbitMQEndpoint.java:113)
at org.apache.camel.component.rabbitmq.RabbitMQConsumer.doStart(RabbitMQConsumer.java:50)
at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
at org.apache.camel.impl.DefaultCamelContext.startService(DefaultCamelContext.java:2083)
at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:2377)
at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:2313)
at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:2243)
at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:2256)
at org.apache.camel.impl.DefaultCamelContext.startRouteService(DefaultCamelContext.java:2133)
at org.apache.camel.impl.DefaultCamelContext.startRoute(DefaultCamelContext.java:794)
at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:2109)
at org.apache.camel.impl.DefaultCamelContext.addRouteDefinitions(DefaultCamelContext.java:726)
at org.apache.camel.builder.RouteBuilder.populateRoutes(RouteBuilder.java:337)
at org.apache.camel.builder.RouteBuilder.addRoutesToCamelContext(RouteBuilder.java:264)
at org.apache.camel.impl.DefaultCamelContext.addRoutes(DefaultCamelContext.java:688)
at akka.camel.internal.ConsumerRegistrar$$anonfun$receive$4.applyOrElse(CamelSupervisor.scala:212)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.camel.internal.ConsumerRegistrar.aroundReceive(CamelSupervisor.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
at akka.actor.ActorCell.invoke(ActorCell.scala:581)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)