自定义线程池实现

线程池的特点:

1,迅速响应.

2,线程之间无优先级.

3,线程执行时间短,不阻塞其他任务.

4,线程不可绑定操作,不可被跟踪.

优点:

1,对象线程不用重复的创建与销毁,节省时间,资源.

2,可以对线程的数量进行控制.

案例

线程池类:

  1 import java.util.LinkedList;
  2 
  3 public class ThreadPool extends ThreadGroup {
  4 
  5     private boolean isAlive;// 是否活着
  6 
  7     private LinkedList<Runnable> taskQueue;// 任务队列
  8 
  9     private int threadID;// 线程ID
 10 
 11     private static int threadPoolID;// 线程池ID
 12 
 13     public ThreadPool(int threadNums) {
 14         super("线程池-" + (threadPoolID++));
 15         setDaemon(true);// 设置线程组为守护线程组
 16         isAlive = true;// 设置线程组的初始状态
 17         taskQueue = new LinkedList<Runnable>();// 初始化任务队列
 18         for (int i = 0; i < threadNums; i++) {
 19             //  启动组中的线程(初始化线程,线程一直保持运行状态,不断地获取线程池里面的待执行任务)
 20             new PooledThread().start();
 21         }
 22     }
 23 
 24     /**
 25      * 运行任务的入口,线程安全(有序的到达线程池)
 26      * 
 27      * @param task
 28      */
 29     public synchronized void runTask(Runnable task) {
 30         // 如果线程池关闭状态,抛出异常
 31         if (!isAlive) {
 32             throw new IllegalStateException();
 33         }
 34         // 如果任务不为null
 35         if (task != null) {
 36             // 加入任务队列
 37             taskQueue.add(task);
 38             notify();// 激活该方法上的其他线程之一
 39         }
 40     }
 41 
 42     /**
 43      * 获取等待执行的任务,线程安全(保证一次获取一个,不重复)
 44      * @return
 45      * @throws InterruptedException
 46      */
 47     protected synchronized Runnable getTask()throws InterruptedException{
 48         //当线程池没有可运行的任务时,并且线程池在工作状态,挂起线程池
 49         while(taskQueue.size()==0){
 50             if(!isAlive){
 51                 return null;
 52             }
 53             wait();
 54         }
 55         //否则,返回一条任务
 56         return (Runnable)taskQueue.removeFirst();
 57     }
 58     
 59     /**
 60      * 关闭线程池,停止所有在运行的任务
 61      */
 62     public synchronized void close(){
 63         //如果线程池是活的
 64         if(isAlive){
 65             //关闭
 66             isAlive= false;//
 67             //清空任务
 68             taskQueue.clear();
 69             interrupt();//关闭线程组里面的所有线程(此方法不一定能够停止线程,只是更改了isInterrupted()方法的状态)
 70         }
 71     }
 72     
 73     /**
 74      * 关闭线程池,不停止在运行的任务
 75      */
 76     public void join(){
 77         synchronized(this){
 78             isAlive = false;
 79             notifyAll();
 80         }
 81         //执行未完成的任务
 82         Thread[] threads = new Thread[activeCount()];//创建池中的线程数量一致的数组
 83         int count = enumerate(threads);//复制本线程池中的线程到刚才创建的线程数组
 84         //然后依次执行线程数组中的线程
 85         for(int i=0;i<count;i++){
 86             try{
 87                 threads[i].join();//效果相当于执行
 88             }catch(InterruptedException ei){
 89                 ei.printStackTrace();
 90             }
 91         }
 92     }
 93     
 94     public class PooledThread extends Thread{
 95 
 96         /**
 97          * 创建线程的时候指定线程池
 98          * @param tg
 99          * @param threadPoolName
100          */
101         public PooledThread(){
102             super(ThreadPool.this,"线程(ID号)"+threadID++);
103         }
104         
105         /**
106          * 重写run方法
107          */
108         public void run(){
109             while(!isInterrupted()){
110                 Runnable task = null;
111                 try {
112                     task=getTask();
113                 } catch (Exception e) {
114                     e.printStackTrace();
115                 }
116                 if(task==null){
117                     return;
118                 }
119                 try {
120                     System.out.println(this.getId());
121                     task.run();
122                 } catch (Throwable e) {
123                     // 处理异常
124                     uncaughtException(this,e);
125                 }
126             }
127         }
128         
129         
130     }
131     
132     
133     
134 
135 }

测试方法:

public class Test {
    public static void main(String[] args){
        for(int i=0;i<args.length;i++){
            System.out.println(args[i]);
        }
        if(args.length!=2){
            System.out.println("测试自定义线程池开始");
            System.out.println("使用方法,两个参数[任务数,线程数]");
            System.out.println("任务数-int:需要执行的任务数");
            System.out.println("线程数-int:线程池初始化数量");
        }
        int numTasks = 20;
        int numThreads = 5;
        ThreadPool threadPool = new ThreadPool(numThreads);
        for(int i=0;i<numTasks;i++){
            threadPool.runTask(createTask(i));
        }
        threadPool.join();
    }
    
    private static Runnable createTask(final int taskID){
        return new Runnable() {
            
            @Override
            public void run() {
                System.out.println("任务"+taskID+"开始");
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("任务"+taskID+"结束");
            }
        };
    }
    
}




原文地址:https://www.cnblogs.com/qinggege/p/4977614.html