I am new to concurrent coding and trying to implement simple ThreadPool by myself. I found this implementation on a learning website(jenkov.com) and it seems to be working fine. However, I think thread and isStopped should be atleast volatile in PoolThreadRunnable class or Atomic variables as they are shared by two threads, the one running the method on the object(where isStopped=true and this.thread.interrupt() is called) and the one in which actual run code is being run(where we do this.thread=Thread.currentThread() and while(!isStopped()). Is my understanding correct or am I missing something?
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThreadRunnable> runnables = new ArrayList<>();
private boolean isStopped = false;
public ThreadPool(int noOfThreads, int maxNoOfTasks){
taskQueue = new ArrayBlockingQueue(maxNoOfTasks);
for(int i=0; i<noOfThreads; i++){
PoolThreadRunnable poolThreadRunnable =
new PoolThreadRunnable(taskQueue);
runnables.add(poolThreadRunnable);
}
for(PoolThreadRunnable runnable : runnables){
new Thread(runnable).start();
}
}
public synchronized void execute(Runnable task) throws Exception{
if(this.isStopped) throw
new IllegalStateException("ThreadPool is stopped");
this.taskQueue.offer(task);
}
public synchronized void stop(){
this.isStopped = true;
for(PoolThreadRunnable runnable : runnables){
runnable.doStop();
}
}
public synchronized void waitUntilAllTasksFinished() {
while(this.taskQueue.size() > 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class PoolThreadRunnable implements Runnable {
private Thread thread = null;
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThreadRunnable(BlockingQueue queue){
taskQueue = queue;
}
public void run(){
this.thread = Thread.currentThread();
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.take();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
public synchronized void doStop(){
isStopped = true;
//break pool thread out of dequeue() call.
this.thread.interrupt();
}
public synchronized boolean isStopped(){
return isStopped;
}
}