I am writing a message oriented middleware, in which subscribers and publishers communicate via a queue in the middle class MessageBroker. publishers should drop messages into a queue of a topic if it is not full and subscribers should get the message of a subscribed topic if the queue is not empty. The problem now arises when I try to include wait() NotifyAll() in the receiveMessage() method of the subscriber. With the Publisher it works without any problems, but with the Subscriber I have the problem that they are not retrieved from the wait state, so that they do nothing.
Publisher method:
public synchronized void sendMessage() {
BlockingQueue<Message> topicQueue = mb.getQueue(topic);
if (topicQueue != null) {
try {
// Überprüfen, ob Platz in der Queue ist, bevor Nachricht gesendet wird
while (topicQueue.remainingCapacity() == 0) {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Message m = topic.generateMessage();
mb.publish(m);
if (!gotActive) {
mb.increasePublisherCounter(1);
gotActive = true;
}
System.out.println(m.getContent() + " wurde zur Queue hinzugefügt");
notifyAll(); // Alle Threads benachrichtigen
}
}
Subscriber method:
public synchronized void receiveMessage() {
for (Topic topic : topics) {
BlockingQueue<Message> queue = mb.getQueue(topic);
synchronized (queue) {
try {
// Warten, bis die Warteschlange nicht mehr leer ist
while (queue.isEmpty()) {
queue.wait(); // Warte auf Benachrichtigung, wenn die Warteschlange leer ist
}
// Nachricht aus der Warteschlange holen
Message message = queue.peek();
if (message != null) {
System.out.println(name + " hat Nachricht erhalten: " + message.getContent());
incrementProcessedCounter(topic);
if (topic.getSubscriberCount() == getProcessedCounter(topic)) {
queue.remove();
resetProcessedCounter(topic);
queue.notifyAll(); // Benachrichtige andere wartende Threads
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
what do I have to change in these methods that the methods will work? Thanks for your help.
I can also add more code if you need 🙂