1405. Problem - Custom Thread Pool
Thread Pool and BlockingQueue
Implement a thread pool with blocking queue.
1. Requirement
Implement a thread pool with blocking queue. Use BlockingQueue
to store the tasks from client. The Worker
will take one task from BlockingQueue and run it.
2. Solution
Customized ThreadPool.
import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ThreadPool { private BlockingQueue bq ; private List<Worker> threads; private boolean isStopped; private int capacity; public ThreadPool(int numOfThreads) { isStopped = false; capacity = 100; bq = new ArrayBlockingQueue(capacity); threads = new ArrayList<>(); for (int i = 0; i < numOfThreads; i++) { threads.add(new Worker(i + 1, bq)); } for (Worker worker : threads) { worker.start(); } System.out.println("Thread pool is ready..."); } public synchronized void execute(Runnable task) throws Exception { if (this.isStopped) { throw new IllegalStateException("ThreadPool is stopped"); } this.bq.put(task); } public synchronized void shutdown() { try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } this.isStopped = true; for (Worker thread : threads) { thread.doStop(); } } }
Worker extends the Thread class. Call BlockingQueue.take() method to get task and execute it.
class Worker extends Thread { private int id; private BlockingQueue<Task> bq; private boolean isStopped; public Worker(int id, BlockingQueue bq) { this.id = id; this.bq = bq; this.isStopped = false; } public void run(){ while(!isStopped()){ try { Task task = bq.take(); System.out.println("Worker #" + id + " is working on the task: " + task.getName()); task.run(); } catch(Exception e){ //log or otherwise report exception, //but keep worker thread alive. } } } public synchronized void doStop(){ if (!this.isInterrupted()) { try { this.interrupt(); //break worker thread } catch (SecurityException ignore) { } finally { } } isStopped = true; } public synchronized boolean isStopped(){ return isStopped; } }
Real task needs to be executed.
public class Task implements Runnable { private String name; private Random random; public Task(String name) { this.name = name; this.random = new Random(); } public String getName() { return name; } public void run() { try { Long duration = (long) (random.nextInt(10)); System.out.println("Executing : " + name + " at " + LocalDateTime.now().toString()); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } }
Create a thread pool with 2 workers and create 5 tasks to be executed by the thread pool.
public class ThreadPoolExample { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2); List<Task> list = new ArrayList<>(); for (int i = 1; i <= 5; i++) { Task task = new Task("Task" + i); list.add(task); System.out.println("Created : " + task.getName()); } for (Task task : list) { try { threadPool.execute(task); } catch (Exception e) { e.printStackTrace(); } } threadPool.shutdown(); } }
Output.
Thread pool is ready...
Created : Task1
Created : Task2
Created : Task3
Created : Task4
Created : Task5
Worker #2 is working on the task: Task2
Worker #1 is working on the task: Task1
Executing : Task2 at 2020-04-17T21:24:56.042
Executing : Task1 at 2020-04-17T21:24:56.042
Worker #1 is working on the task: Task3
Executing : Task3 at 2020-04-17T21:24:56.099
Worker #2 is working on the task: Task4
Executing : Task4 at 2020-04-17T21:25:04.100
Worker #1 is working on the task: Task5
Executing : Task5 at 2020-04-17T21:25:05.102