I am implementing in memory java pub sub system.
There are multiple publishers and multiple subscribers.
Publisher part is easy.
I see 2 ways to implement subscribers
- periodic wake up
- java Object Wait and Notify
In periodic wake up approach, My subscriber reads all the messages that are available and then goes to sleep for some fixed duration. In this approach the only issue is that consumer would only be able to read new messages after the fixed duration has passed. I can not reduce sleep duration too much because that would result in too much cpu usage.
In wait and notify approach, rather than sleeping for some fixed duration. I invoke sleep on my current object. As and when new messages arrive, producer will invoke wakeUp()
method. It will result in lesser end to end latencies and less cpu cycles wasted in checking if a new message has come.
Below is sample implementation for Wait and notify
public void run() {
synchronized (this) {
do {
while (offset >= topic.getMessages().size()) {
this.wait();
}
String message = topic.getMessages().get(offset);
consumer.consumeMessage(message);
offset = offset + 1;
} while (true);
}
}
public void wakeUp() {
synchronized (this) {
notify();
}
}
But the problem with this approach is, when consumer is already Running (but not waiting) all the wakeUp()
method invocation would wait to acquire lock on this
. It would result in contention in wakeup method.
Is there a way to notify only if object is already waiting?
One hack I can think of is comparing offset with latest offset and would not try to take a synchronized (this) if difference is a lot, but i don’t see this as a fail safe method.