I am trying to understand the working of semaphores in the context of producers and consumers. Have a simple bounded queue containing push & pop APIs. Also, managing multiple producers and consumers using Semaphores but still contention is occurring,
Queue class
class BoundedQueue {
private Queue<Integer> q;
int maxSize;
Semaphore produce;
Semaphore consume;
BoundedQueue(int size) throws InterruptedException {
maxSize = size;
q = new LinkedList<>();
produce = new Semaphore(maxSize);
consume = new Semaphore(-1);
}
void push(int val) {
try {
produce.acquire();
q.offer(val);
System.out.println(Thread.currentThread().getName() + " produced " + val);
if (q.size() > maxSize) {
System.out.println("Max size breached");
}
consume.release();
}
catch (InterruptedException e) {
System.out.println("Interrupted exception " + e);
produce.release();
}
}
int pop() {
int val = -1;
try {
consume.acquire();
if (q.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " is trying to consume empty queue");
return val;
}
val = q.remove();
System.out.println(Thread.currentThread().getName() + " consumed " + val);
if (q.size() > maxSize) {
System.out.println("Max size breached");
}
produce.release();
}
catch (InterruptedException e) {
System.out.println("Interrupted exception " + e);
consume.release();
}
return val;
}
}
Multiple producers and consumers that are expected to work in tandem
public class ProducerConsumerSemaphores extends Thread {
private BoundedQueue que;
private String workerType;
ProducerConsumerSemaphores(BoundedQueue que, String workerType) {
this.que = que;
this.workerType = workerType;
}
public void run() {
System.out.println("Starting worker " + workerType + " " + Thread.currentThread().getName());
if (workerType.equals("producer")) {
Random rand = new Random();
int val = rand.nextInt(1, 100);
for (int i=0; i<10; ++i) {
que.push(val+i);
try {
Thread.sleep(10);
}
catch (Exception e) {
System.out.println("Caught " + e);
}
}
}
else {
for (int i=0; i<10; ++i) {
que.pop();
try {
Thread.sleep(100);
}
catch (Exception e) {
System.out.println("Caught " + e);
}
}
}
System.out.println("Ending woker " + workerType + " " + Thread.currentThread().getName());
}
public static void main(String args[]) {
System.out.println();
try {
BoundedQueue que = new BoundedQueue(5);
ProducerConsumerSemaphores p1 = new ProducerConsumerSemaphores(que, "producer");
p1.setName("p1");
ProducerConsumerSemaphores p2 = new ProducerConsumerSemaphores(que, "producer");
p2.setName("p2");
ProducerConsumerSemaphores p3 = new ProducerConsumerSemaphores(que, "producer");
p3.setName("p3");
p1.start();
p2.start();
ProducerConsumerSemaphores c1 = new ProducerConsumerSemaphores(que, "consumer");
c1.setName("c1");
c1.start();
ProducerConsumerSemaphores c2 = new ProducerConsumerSemaphores(que, "consumer");
c2.setName("c2");
c2.start();
p3.start();
ProducerConsumerSemaphores c3 = new ProducerConsumerSemaphores(que, "consumer");
c3.setName("c3");
c3.start();
}
catch (Exception e) {
System.out.println("Exception occurred");
}
}
}
sample output
Starting worker producer p2
Starting worker producer p1
Starting worker consumer c1
Starting worker consumer c3
Starting worker producer p3
Starting worker consumer c2
p2 produced 87
p1 produced 61
p3 produced 90
c1 consumed 87
Exception in thread "c2" java.util.NoSuchElementException
at java.base/java.util.LinkedList.removeFirst(LinkedList.java:274)
at java.base/java.util.LinkedList.remove(LinkedList.java:689)
at BoundedQueue.pop(ProducerConsumerSemaphores.java:42)
at ProducerConsumerSemaphores.run(ProducerConsumerSemaphores.java:85)
p2 produced 88
c3 consumed 88
p2 produced 89
What is going wrong, any idea?