I hope you are all well. I would like to share a problem I am facing in my messaging architecture.
Architecture:
Web clients: Connect to RabbitMQ using the STOMP protocol.
Exchange: Clients subscribe to an exchange that holds the messages.
Client web snippet:
const topic = '/exchange/topic_eventos_live_testing/*.*.*.evento_12345';
return this.wsService.observe<{action: any, data: any}>(topic);
public observe<T>(topic: string, mapper?: (body: any) => T): SubscriptionPoolEntity<T> {
if (!this.subscriptionPool.hasOwnProperty(topic)) {
this.subscriptionPool[topic] = new SubscriptionPoolEntity<T>(mapper);
}
if (!this.subscriptionPool[topic].active) {
this.connectChannel<T>(topic);
}
return this.subscriptionPool[topic];
}
Problem:
This approach generates a large number of queues in RabbitMQ, directly proportional to the number of users, which is not scalable.
Intermediate component:
There is a component between RabbitMQ and STOMP that handles security validation. I decided to modify this component to dynamically change the subscription destination to streams in RabbitMQ, instead of static queues.
Intermediate component code:
`@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
List<String> authorization = accessor.getNativeHeader("X-Authorization");
if (!StringUtils.isEmpty(authorization)) {
try {
String accessToken = authorization.get(0);
Jwt jwt = jwtDecoder.decode(accessToken);
JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
Authentication authentication = converter.convert(jwt);
accessor.setUser(authentication);
SecurityContextHolder.getContext().setAuthentication(authentication);
} catch (JwtValidationException e) {
LOGGER.info("*** JWT expired, reconnecting client ***");
}
}
}
return message;
}
}, new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message);
if (StompCommand.SUBSCRIBE.equals(headerAccessor.getCommand())) {
Principal userPrincipal = headerAccessor.getUser();
String destination = headerAccessor.getDestination();
LOGGER.info("destination=" + destination);
if (destination.startsWith(prematchEventTopic)) {
String queuePrematchEventos = destination.substring(prematchEventTopic.length());
headerAccessor.addNativeHeader("x-queue-name", "prematch." + queuePrematchEventos);
headerAccessor.addNativeHeader("x-queue-type", "stream");
headerAccessor.addNativeHeader("auto-delete", "false");
headerAccessor.addNativeHeader("durable", "true");
headerAccessor.setDestination(TOPIC_PREMATCH + queuePrematchEventos);
}
if (destination.startsWith(topicEventoVirtual)) {
String queuePrematchEventos = destination.substring(topicEventoVirtual.length());
headerAccessor.addNativeHeader("x-queue-name", "virtual." + queuePrematchEventos);
headerAccessor.addNativeHeader("x-queue-type", "stream");
headerAccessor.addNativeHeader("auto-delete", "false");
headerAccessor.addNativeHeader("durable", "true");
headerAccessor.setDestination(TOPIC_VIRTUAL + queuePrematchEventos);
}
if (destination.startsWith(liveEventTopic)) {
String queueLiveEventos = destination.substring(liveEventTopic.length());
headerAccessor.addNativeHeader("x-queue-name", "live." + queueLiveEventos);
headerAccessor.addNativeHeader("x-queue-type", "stream");
headerAccessor.addNativeHeader("auto-delete", "false");
headerAccessor.addNativeHeader("durable", "true");
headerAccessor.setDestination(TOPIC_LIVE + queueLiveEventos);
}
if (destination.startsWith(activeBetsTopic)) {
LOGGER.info("headerAccessor=" + headerAccessor + ", userPrincipal=" + headerAccessor.getUser());
if (!validateSubscription(userPrincipal, headerAccessor.getDestination())) {
throw new IllegalArgumentException("No permission for this topic");
}
String queueApuestasActivas = "apuestas_activas_" + ((JwtAuthenticationToken) userPrincipal).getToken().getClaimAsString(JWT_USERNAME_CLAIM);
headerAccessor.addNativeHeader("x-queue-name", queueApuestasActivas);
}
return MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
}
return message;
}
});
}`
Specific problem:
When changing the destination in real-time and configuring the queues as streams, I encounter the following error in RabbitMQ:
prefetch count is not set for ‘queue ‘test_stream’ in vhost ‘/’
Full error log:
Intermediate component: Java Spring WebSocket
RabbitMQ: 3.10.7
Web client: Angular with STOMP.js
Objective:
I want to dynamically configure streams in RabbitMQ based on the routing key of a topic, so that RabbitMQ creates streams according to the routing key. I am looking for a solution to avoid creating static queues for each client and instead use streams for better scalability.
Expectations:
I hope to get guidance on how to correctly configure streams in RabbitMQ from an intermediate component and how to avoid the mentioned error. Any suggestions on how to improve this architecture would also be appreciated.
Thank you for your help and time!
I was hoping that by modifying the destination of the subscriptions and configuring the message headers, the messages would be routed correctly to the corresponding streams in RabbitMQ. This should allow me to avoid creating static queues and improve the scalability of my system.
Francisco Espinosa Gonzales is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.