【JAVA】Java 线程池自实现

参考《Java并发编程的艺术》

接口

package pres.ndz.simple.thread;

import org.quartz.Job;

public interface ThreadPool<Job extends Runnable> {

    void execute(Job job);
    void shutdowm();
    void addWorkers(int num);
    void removeWorker(int num);
    int  getJobSize();

}


实现


package pres.ndz.simple.thread;


import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 实现默认线程池
 * 线程池中的线程不是由调用方来创建,而是初始化线程池时已经默认创建好了活跃线程,用的时候
 * 线程池会自动取线程执行(活跃线程一直在等待执行其实,很像spring)
 */
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {

    // 线程池最大线程数量
    private static final int MAX_WORKER_NUMBERS = 10;
    // 默认线程池数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 线程池最小线程数量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 工作清单
    private final LinkedList<Job> jobs = new LinkedList<>();
    // 工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    // 工作者线程数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    // 线程编号生成
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool(){
        initializeWorkers(DEFAULT_WORKER_NUMBERS);
    }

    /**
     * 指定线程池大小,最大不能超过 MAX_WORKER_NUMBERS
     * 最小不能小于 MIN_WORKER_NUMBERS(0或负都将返回 MIN_WORKER_NUMBERS)
     */
    public DefaultThreadPool(int num){
        workerNum = num > MAX_WORKER_NUMBERS?MAX_WORKER_NUMBERS:num < MIN_WORKER_NUMBERS?
                MIN_WORKER_NUMBERS : num;
        initializeWorkers(num);
    }



    /**
     * 初始化线程工作者
     * 初始完毕后会生成给定数量的Thread 一直查看jobs的数量 ,大于0就执行
     */
    private void initializeWorkers(int num){
        for (int i = 0; i< num; i++){
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    /**
     * 调用方执行一个Job
     */
    @Override
    public void execute(Job job) {
        if (job != null) {
            // 添加一个工作,然后进行通知
            // 多线程环境下添加job,会有资源竞争抢锁问题,所以使用synchronized用来同步添加
            synchronized (jobs){
                jobs.addLast(job);
                // 当前线程添加完毕后,通知其他线程可以添加了
                jobs.notify();
            }
        }
    }

    /**
     * 停止所有的Worker(线程池全部关闭)
     */
    @Override
    public void shutdowm() {
        for (Worker worker : workers){
            worker.shutdown();
        }
    }

    /**
     * 添加给定数量的Worker
     */
    @Override
    public void addWorkers(int num) {
        synchronized (jobs){
            // 限制新增的worker不能超过最大值
            if (num + this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorkers(num);
            this.workerNum += num;
        }
    }

    /**
     * 移除(停止)给定数量停止Worker(线程池数量递减)
     */
    @Override
    public void removeWorker(int num) {
        synchronized (jobs){
            if (num >= this.workerNum){
                throw new IllegalStateException("beyond workNum");
            }
            int count = 0;
            while (count < num){
                Worker worker = workers.get(count);
                if (workers.remove(worker)){
                    worker.shutdown();
                    count ++;
                }
            }

        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }

    /**
     * 工作者(任务),负责消费任务
     */
    class Worker implements Runnable{

        // 是否工作
        private volatile boolean running = true;

        @Override
        public void run() {
            while (running){
                Job job = null;

                synchronized (jobs){
                    while (jobs.isEmpty()){
                        try {
                            jobs.wait();
                        }catch (InterruptedException ex){
                            // 感知到外部对WorkerThread的中断操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    // 从工作清单中取出一个Job
                    job = jobs.removeFirst();
                }

                if(job != null ){
                    try {
                        job.run();
                    }catch (Exception ex){
                        // 忽略Job执行中的异常
                    }
                }
            }

        }


        public void shutdown(){
            running = false;
        }
    }
}

测试


public class GarbageTest {


    @Test
    public void test() throws InterruptedException {
        ThreadPool pool = new DefaultThreadPool(2);

        for (int i = 0; i < 10; i++) {

            if(i == 5) pool.removeWorker(1);

            pool.execute(()-> {
                System.out.printf("Thread is [%s], Job start .... 
",Thread.currentThread().getName());
                try {
                    Thread.sleep(2*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

        }


        Thread.sleep(60*10000);


    }

}

结果


Thread is [ThreadPool-Worker-1], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 
Thread is [ThreadPool-Worker-2], Job start .... 

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

“年轻时,我没受过多少系统教育,但什么书都读。读得最多的是诗,包括烂诗,我坚信烂诗早晚会让我邂逅好诗。” by. 马尔克斯
原文地址:https://www.cnblogs.com/jzsg/p/11028772.html