The source code is RebalancePushImpl#removeUnnecessaryMessageQueue.
In my opinion, just persist it once after locking it.
public boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq) {
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
// First persistence
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
// remove order message queue: unlock & remove
return tryRemoveOrderMessageQueue(mq, pq);
} else {
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
return true;
}
}
private boolean tryRemoveOrderMessageQueue(final MessageQueue mq, final ProcessQueue pq) {
try {
// unlock & remove when no message is consuming or UNLOCK_DELAY_TIME_MILLS timeout (Backwards compatibility)
boolean forceUnlock = pq.isDropped() && System.currentTimeMillis() > pq.getLastLockTimestamp() + UNLOCK_DELAY_TIME_MILLS;
if (forceUnlock || pq.getConsumeLock().writeLock().tryLock(500, TimeUnit.MILLISECONDS)) {
try {
// Second persistence
RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
pq.setLocked(false);
RebalancePushImpl.this.unlock(mq, true);
return true;
} finally {
if (!forceUnlock) {
pq.getConsumeLock().writeLock().unlock();
}
}
} else {
pq.incTryUnlockTimes();
}
} catch (Exception e) {
pq.incTryUnlockTimes();
}
return false;
}
By reading the source code, I personally think that only need to persist it once after acquiring the write lock.