线程池简单实现

实现了一个简化版的线程池。

实现线程池的关键有两个:一是阻塞队列,用于任务的存取,二是内部的线程对象如何持续性的执行任务,并在空闲时被回收。

线程池代码:

  1 package learnConcurrent;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Collection;
  5 import java.util.LinkedList;
  6 import java.util.List;
  7 import java.util.concurrent.ArrayBlockingQueue;
  8 import java.util.concurrent.BlockingQueue;
  9 import java.util.concurrent.Callable;
 10 import java.util.concurrent.ExecutionException;
 11 import java.util.concurrent.ExecutorService;
 12 import java.util.concurrent.Future;
 13 import java.util.concurrent.TimeUnit;
 14 import java.util.concurrent.TimeoutException;
 15 import java.util.concurrent.atomic.AtomicBoolean;
 16 import java.util.concurrent.atomic.AtomicInteger;
 17 import java.util.concurrent.locks.ReentrantLock;
 18 
 19 public class MyThreadPool implements ExecutorService{
 20     //线程队列
 21     private List<Worker> workers;
 22     //任务队列
 23     private BlockingQueue<Runnable> rQueue;
 24     //线程池核心大小
 25     private int corePoolSize;
 26     //线程池最大大小
 27     private int maxPoolSize;
 28     //空闲线程最长存活时间
 29     private int keepAliveTime = 60;
 30 
 31     private static final int ALIVE = 0;
 32     
 33     private static final int SHUTDOMN = 1;
 34     
 35     private int state = ALIVE;
 36     
 37     private ReentrantLock lock = new ReentrantLock();
 38     
 39     public MyThreadPool(int corePoolSize, int maxPoolSize){
 40         this.corePoolSize = corePoolSize;
 41         this.maxPoolSize = maxPoolSize;
 42         
 43         this.workers = new LinkedList<Worker>();
 44         //阻塞队列,最大容量为maxPoolSize
 45         this.rQueue = new ArrayBlockingQueue<Runnable>(maxPoolSize, true);
 46     }
 47     
 48     @Override
 49     public void execute(Runnable command) {
 50         if(isShutdown())
 51             return;
 52         //FIXME size在获取时和判断时 可能发生改变
 53         lock.lock();
 54         int size = workers.size();
 55         if(size < corePoolSize){//当线程池线程数小于核心数量时,增加线程
 56             addWorker();
 57         }else if(size < maxPoolSize && !rQueue.isEmpty()){//当线程大于核心数量且任务队列中任务排队时,增加线程
 58             addWorker();
 59         }
 60         lock.unlock();
 61         
 62         rQueue.offer(command);
 63     }
 64     
 65     @Override
 66     public void shutdown() {
 67         //关闭线程池的简单实现,设置状态让任务队列不在接受任务,线程也会因为超时被回收
 68         //缺点时空闲的线程资源得不到立即释放
 69         lock.lock();
 70         state = SHUTDOMN;
 71         lock.unlock();
 72     }
 73     
 74     /**
 75      * 立即停止线程池,试图停止正在活动的线程,返回还在等待的任务列表
 76      */
 77     @Override
 78     public List<Runnable> shutdownNow() {
 79         if(isShutdown())
 80             return null;
 81         lock.lock();
 82         state = SHUTDOMN;
 83         List<Runnable> restRunnable = new ArrayList<Runnable>();
 84         while(!rQueue.isEmpty()){
 85             restRunnable.add(rQueue.poll());
 86         }
 87         for(Worker w : workers){
 88             w.interrupt();
 89         }
 90         lock.unlock();
 91         return restRunnable;
 92     }
 93 
 94     @Override
 95     public boolean isShutdown() {
 96         lock.lock();
 97         boolean res = state != ALIVE;
 98         lock.unlock();
 99         return res;
100     }
101 
102     @Override
103     public boolean isTerminated() {
104         return isShutdown() && rQueue.isEmpty();
105     }
106 
107     @Override
108     public boolean awaitTermination(long timeout, TimeUnit unit)
109             throws InterruptedException {
110         // TODO Auto-generated method stub
111         return false;
112     }
113 
114     @Override
115     public <T> Future<T> submit(Callable<T> task) {
116         // TODO Auto-generated method stub
117         return null;
118     }
119 
120     @Override
121     public <T> Future<T> submit(Runnable task, T result) {
122         // TODO Auto-generated method stub
123         return null;
124     }
125 
126     @Override
127     public Future<?> submit(Runnable task) {
128         return null;
129     }
130 
131     @Override
132     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
133             throws InterruptedException {
134         return null;
135     }
136 
137     @Override
138     public <T> List<Future<T>> invokeAll(
139             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
140             throws InterruptedException {
141         return null;
142     }
143 
144     @Override
145     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
146             throws InterruptedException, ExecutionException {
147         return null;
148     }
149 
150     @Override
151     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
152             long timeout, TimeUnit unit) throws InterruptedException,
153             ExecutionException, TimeoutException {
154         return null;
155     }
156     
157     private Runnable getTask(){
158         Runnable r = null;
159         try {
160             r = rQueue.poll(keepAliveTime, TimeUnit.SECONDS);
161         } catch (InterruptedException e) {
162             // TODO Auto-generated catch block
163             e.printStackTrace();
164         }
165         return r;
166     }
167     
168     private void addWorker(){
169         Worker w = new Worker();
170         w.start();
171         lock.lock();
172         workers.add(w);
173         lock.unlock();
174     }
175     
176     private void removeWorker(Worker w){
177         lock.lock();
178         workers.remove(w);
179         lock.unlock();
180     }
181     
182     class Worker extends Thread{
183         
184         private AtomicBoolean isAlive = new AtomicBoolean(true);
185         
186         private Runnable task;
187         
188         
189         @Override
190         public void run() {
191             while(isAlive.get()){
192                 //阻塞一定时间,超时则回收该线程
193                 task = getTask();
194                 if(task != null){
195                     task.run();
196                 }else{
197                     isAlive.set(false);
198                     
199                 }
200                 task = null;
201             }
202             System.out.println("remove worker");
203             removeWorker(this);
204         }
205         
206     }
207     
208     
209 }

测试代码:

 1 package learnConcurrent;
 2 
 3 
 4 public class ThreadPoolTest {
 5     static int taskNo = 0;
 6     public static void main(String[] args) throws InterruptedException {
 7         MyThreadPool pool = new MyThreadPool(2, 5);
 8         
 9         for(int i=0; i< 50; i++){
10             Task task = new Task(taskNo++);
11             pool.execute(task);
12             Thread.sleep((int)(Math.random() * 1000));
13         }
14         
15     }
16     
17 }
18 
19 class Task implements Runnable{
20     String str;
21     public Task(int taskNo){
22         str = "TaskNo:" + taskNo;
23     }
24     @Override
25     public void run() {
26         System.out.println(str + " start work ");
27         //DO SOMETHING
28         try {
29             Thread.sleep((int)(Math.random() * 1000));
30         } catch (InterruptedException e) {
31             // TODO Auto-generated catch block
32             e.printStackTrace();
33         }
34         
35         System.out.println(str + " done ");
36     }
37     
38 }

虽然是继承了ExecutorService对象,但是只实现了几个接口,设计上也可能有未考虑到的问题。

测试代码也很简陋,仅供参考。

原文地址:https://www.cnblogs.com/insaneXs/p/7508328.html