I am using aeron replay process to recover messages and then switch to real time feed to ensure continuity of messages. I am using a persistent map to keep track of sessionId/Channel/StreamId to last position and then ensuring my replay query starts from last position received. Replay seems to work good. But now I see that the transition from replay poll to regular poll is not working well for me. Mostly it could be the logic I am using and as a result I see few messages coming back again in the regular channel which I have received from replay channel.
example log:
2024-06-17T23:50:41.324+05:30 INFO 24170 --- [ receiver] c.x.r.codec.MsgFragmentHandler : Received message from replay: streamId=20 position=1152 sequenceNumber=18, messageCode=tsR6L90S
2024-06-17T23:50:41.324+05:30 INFO 24170 --- [ receiver] c.x.r.codec.MsgFragmentHandler : Received message from replay: streamId=20 position=1216 sequenceNumber=19, messageCode=rMjO2Opq
2024-06-17T23:50:41.325+05:30 INFO 24170 --- [ receiver] c.x.r.codec.MsgFragmentHandler : Received message from replay: streamId=20 position=1280 sequenceNumber=20, messageCode=Rbff7mTU
2024-06-17T23:50:41.325+05:30 INFO 24170 --- [ receiver] c.x.r.codec.MsgFragmentHandler : Received message from replay: streamId=20 position=1344 sequenceNumber=21, messageCode=qoQAktwI
2024-06-17T23:50:41.342+05:30 INFO 24170 --- [ receiver] c.x.r.codec.MsgFragmentHandler : Received message: streamId=1002 position=1216 sequenceNumber=19, messageCode=rMjO2Opq
2024-06-17T23:50:41.342+05:30 INFO 24170 --- [ receiver] c.x.r.codec.MsgFragmentHandler : Received message: streamId=1002 position=1280 sequenceNumber=20, messageCode=Rbff7mTU
2024-06-17T23:50:41.344+05:30 INFO 24170 --- [ receiver] c.x.r.codec.MsgFragmentHandler : Received message: streamId=1002 position=1344 sequenceNumber=21, messageCode=qoQAktwI
2024-06-17T23:50:42.514+05:30 INFO 24170 --- [ receiver] c.x.r.codec.MsgFragmentHandler : Received message: streamId=1002 position=1408 sequenceNumber=22, messageCode=j5wTJXlw
As you can see from the above logs, sequenceNumber 19,20 and 21 messages are duplicates. I have indicated which messages are received via replay and which ones via regular session.
My code is as follows:
The agent to receive messages use state to decide whether to poll on regular subscription or continue replay.
@Override
public int doWork() {
switch (currentState) {
case AERON_READY -> {
if (! isReplayInProgress()) {
replay();
}
break;
}
case POLLING_SUBSCRIPTION -> subscription.poll(fragmentHandler, 100);
default -> LOGGER.error("unknown state {}", currentState);
}
return 0;
}
and my replay function is:
public void replay() {
if (!replayInProgress.compareAndSet(false, true)) {
return;
}
final long recordingId = findLatestRecording(aeronArchive, IPC_CHANNEL, receiverStreamId);
long lastPosition = lastMessagePositionMap.getOrDefault(lastMessagePositionKey, 0L);
LOGGER.info("Requesting replay from lastPosition {}", lastPosition);
if (recordingId != -1 && lastPosition != aeronArchive.getRecordingPosition(recordingId)) {
LOGGER.info("{} recordingId found from findLatestRecording", recordingId);
LOGGER.info("{} receivedPosition found ", lastReceivedMessagePosition.get());
LOGGER.info("{} getRecordingPosition found ", aeronArchive.getRecordingPosition(recordingId));
LOGGER.info("{} receivedMessageCount found ", receivedMessageCount.get());
final long position = lastPosition;
final long length = Long.MAX_VALUE;
try {
final long sessionId = aeronArchive.startReplay(recordingId, position, length, IPC_CHANNEL, 20);
LOGGER.info("Replay SessionID: {} requested starting position {} ", sessionId, position);
final String channelRead = ChannelUri.addSessionId(IPC_CHANNEL, (int) sessionId);
final Subscription replaySubscription = aeronArchive.context().aeron().addSubscription(channelRead, 20);
while (!replaySubscription.isConnected()) {
idleStrategy.idle();
}
boolean replayCompleted = false;
while (!replayCompleted) {
int fragments = replaySubscription.poll(fragmentAssembler, 100);
if (fragments > 0) {
long currentReplayPosition = replaySubscription.imageAtIndex(0).position();
lastMessagePositionMap.put(lastMessagePositionKey, currentReplayPosition);
}
receivedMessageCount.incrementAndGet();
lastReceivedMessagePosition.set(replaySubscription.imageAtIndex(0).position());
long currentReplayPosition = lastReceivedMessagePosition.get();
long endPosition = aeronArchive.getRecordingPosition(recordingId);
replayCompleted = currentReplayPosition >= endPosition;
if (fragments == 0) {
idleStrategy.idle();
}
}
aeronArchive.stopReplay(sessionId);
replaySubscription.close();
currentState = State.POLLING_SUBSCRIPTION;
} catch (Exception e) {
LOGGER.error("Error during replay", e);
} finally {
replayInProgress.set(false);
}
} else {
replayInProgress.set(false);
}
}
do you see the reason for duplications here? I am obviously going to put checks for possible duplicates, but I wanted to check if the handling of replay is correctly done here.