Prior to spring boot 3.2.x I was able to “preserve receive order” by configuring the ClientInboundChannel of the WebSocket server with a single thread (not a scalable solution). I noticed that this technique stopped working with 3.2.x and 3.3.x and that the StompEndpointRegistry.preserveReceiveOrder property is now available.
When testing the configuration with a large number (over 300) of subscribe/unsubscribes I consistently get a ConcurrentModificationException when calling SimpMessageHeaderAccessor.wrap
inside a subscription event listener like this:
@Component
@Slf4j
public class SubscriptionEventListener implements ApplicationListener<SessionSubscribeEvent> {
@Override
public void onApplicationEvent(@NonNull SessionSubscribeEvent event) {
log.info("SessionSubscribeEvent: {}", event);
SimpMessageHeaderAccessor wrapped = SimpMessageHeaderAccessor.wrap(event.getMessage());
log.info("SimpMessageHeaderAccessor: {}", wrapped);
}
}
This is the stacktrace:
2024-06-12 16:05:39:087 +0200 [http-nio-auto-1-exec-6] ERROR org.springframework.web.socket.messaging.StompSubProtocolHandler - Error publishing SessionSubscribeEvent[GenericMessage [payload=byte[0], headers={simpMessageType=SUBSCRIBE, stompCommand=SUBSCRIBE, nativeHeaders={destination=[/app/ping], id=[414]}, simpSessionAttributes={}, simpHeartbeat=[J@434f7d03, simpSubscriptionId=414, simpNextMessageTask=org.springframework.messaging.simp.broker.OrderedMessageChannelDecorator$PostHandleTask@631ceecc, simpSessionId=2aa47101-8596-f4fc-5072-ca1bf15edb56, simpDestination=/app/ping}]]
java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1597)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1630)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1628)
at java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$1.next(Collections.java:1711)
at java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$1.next(Collections.java:1704)
at java.base/java.util.HashMap.putMapEntries(HashMap.java:511)
at java.base/java.util.HashMap.<init>(HashMap.java:484)
at org.springframework.messaging.MessageHeaders.<init>(MessageHeaders.java:138)
at org.springframework.messaging.support.MessageHeaderAccessor$MutableMessageHeaders.<init>(MessageHeaderAccessor.java:639)
at org.springframework.messaging.support.MessageHeaderAccessor.<init>(MessageHeaderAccessor.java:139)
at org.springframework.messaging.support.NativeMessageHeaderAccessor.<init>(NativeMessageHeaderAccessor.java:73)
at org.springframework.messaging.simp.SimpMessageHeaderAccessor.<init>(SimpMessageHeaderAccessor.java:112)
at org.springframework.messaging.simp.SimpMessageHeaderAccessor.wrap(SimpMessageHeaderAccessor.java:279)
at com.example.websocket.SubscriptionEventListener.onApplicationEvent(SubscriptionEventListener.java:16)
at com.example.websocket.SubscriptionEventListener.onApplicationEvent(SubscriptionEventListener.java:10)
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:185)
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:178)
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:156)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:451)
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:384)
at org.springframework.web.socket.messaging.StompSubProtocolHandler.publishEvent(StompSubProtocolHandler.java:448)
at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageFromClient(StompSubProtocolHandler.java:352)
at org.springframework.web.socket.messaging.SubProtocolWebSocketHandler.handleMessage(SubProtocolWebSocketHandler.java:356)
at org.springframework.web.socket.handler.WebSocketHandlerDecorator.handleMessage(WebSocketHandlerDecorator.java:75)
at org.springframework.web.socket.handler.LoggingWebSocketHandlerDecorator.handleMessage(LoggingWebSocketHandlerDecorator.java:56)
at org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator.handleMessage(ExceptionWebSocketHandlerDecorator.java:58)
at org.springframework.web.socket.adapter.standard.StandardWebSocketHandlerAdapter.handleTextMessage(StandardWebSocketHandlerAdapter.java:113)
at org.springframework.web.socket.adapter.standard.StandardWebSocketHandlerAdapter$3.onMessage(StandardWebSocketHandlerAdapter.java:84)
at org.springframework.web.socket.adapter.standard.StandardWebSocketHandlerAdapter$3.onMessage(StandardWebSocketHandlerAdapter.java:81)
at org.apache.tomcat.websocket.WsFrameBase.sendMessageText(WsFrameBase.java:390)
at org.apache.tomcat.websocket.server.WsFrameServer.sendMessageText(WsFrameServer.java:130)
at org.apache.tomcat.websocket.WsFrameBase.processDataText(WsFrameBase.java:484)
at org.apache.tomcat.websocket.WsFrameBase.processData(WsFrameBase.java:284)
at org.apache.tomcat.websocket.WsFrameBase.processInputBuffer(WsFrameBase.java:130)
at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:85)
at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:184)
at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:164)
at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:152)
at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:60)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:57)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1190)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:63)
at java.base/java.lang.Thread.run(Thread.java:840)
I have created a minimal project with a test to demonstrate the problem: here. The error does not occur when the StompEndpointRegistry is configured with setPreserveReceiveOrder(false)
.
Is there some necessary configuration I am missing? I suspect the error could be caused by org.springframework.messaging.simp.broker.OrderedMessageChannelDecorator
which modifies the message headers, but I don’t understand enough about the threading model in the spring websocket server to understand how to avoid the error. I have tried simple solutions like synchronizing code on the SessionSubscribeEvent.Message object itself in the SubscriptionEventListener but this doesn’t seem to help. Any guidance would be greatly appreciated.