理解线程池,自己实现一个线程池

线程池本质是一个生产者-消费者模式,一边维护一些线程执行任务,一边由主线程添加一些任务。现在我们抛弃源码中一些繁杂的状态判断,自己写一个线程池。

public class poolT {
   //可能频繁增删任务,链表队列效率较高
private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); private final HashSet<Work> workers = new HashSet<Work>(); private static int num = 3; public poolT(int num) { this.num = num; for (int i = 0; i < num; i++) { Work w = new Work(); w.start(); workers.add(w); } } public void addWork(Runnable r) { workQueue.add(r); } public void close() throws Exception { while (!workQueue.isEmpty()) { Thread.sleep(500); } for (Work work : workers) { // 通知正在运行的结束 work.setDrop(); // 强制结束还在等待的 if (work.getState() == Thread.State.WAITING) { work.interrupt(); } } Thread.sleep(2000); for (Work work : workers) { System.out.println(work.getName() + "状态:" + work.getState()); } } // 内部线程封装 private class Work extends Thread { Runnable r = null; // 结束线程标志位 private boolean hasRunning = true; public void setDrop() { this.hasRunning = false; } public void run() { try { while (hasRunning || !workQueue.isEmpty()) { // 阻塞线程执行 r = workQueue.take(); if (r != null) { r.run(); } } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { poolT p = new poolT(4); for (int i = 0; i < 2; i++) { Runnable newRun = new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "运行任务;"); } catch (InterruptedException e) { e.printStackTrace(); } } }; p.addWork(newRun); } p.close(); System.out.println("主程序完毕"); } }

这里面我使用了一个阻塞队列,当任务添加时,由队列随机选取一个空闲线程进行处理,没有任务时,进行阻塞。

当然也可以不用阻塞队列,不过需要自己进行同步

public class MyThreadPool {

    List<Runnable> taskList = new LinkedList<Runnable>();

    private List<MyThread> threadList = new LinkedList<MyThread>();

    private static MyThreadPool threadPool;

    public MyThreadPool(int num) {
        for (int i = 0; i < num; i++) {
            threadList.add(new MyThread());
        }
        for (MyThread thread : threadList) {
            thread.start();
        }
    }

      public void destroy() {  
            while (!taskList.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧  
                try {  
                    Thread.sleep(10);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            // 工作线程停止工作,且置为null  
            for (MyThread thread : threadList) {
                thread.setDistroy();
            }
        }  
      
    public void execute(Runnable run) {

        synchronized (taskList) {
            taskList.add(run);
            taskList.notify();
        }
    }

    private class MyThread extends Thread {
        public boolean hasRun = true;

        private void setDistroy() {
            this.hasRun = false;
        }

        @Override
        public void run() {
            while (hasRun) {
                Runnable r = null;
                System.out.println(Thread.currentThread().getName() + "is running");
                synchronized (taskList) {
                    if (taskList.isEmpty() && hasRun) {
                        try {
                            taskList.wait(20);
                        } catch (InterruptedException e) {

                            e.printStackTrace();
                        }
                    } else {
                        r = taskList.remove(0);
                    }
                }
                if (r != null) {
                    r.run();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
//         ExecutorService excutor=Executors.newFixedThreadPool(3);
        MyThreadPool pool =new MyThreadPool(4);
        pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(500);
                    System.out.println("任务一");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });
        pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(500);
                    System.out.println("任务贰");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        });

        System.out.println("End");
        pool.destroy();
    }
}

 参考:http://blog.csdn.net/hsuxu/article/details/8985931

原文地址:https://www.cnblogs.com/wanglao/p/6600349.html