SimpleThreadPool实践

前言

并发(Concurrency)一直谈论java绕不开的一个话题,从移动开发工程师到后端工程师,几乎所有的面试都要涉及到并发/多线程的一些问题。虽然多数时候我们使用线程池,都是已经实现好的框架——jdk7中就有现成的ThreadPoolExecutor供我们使用,不过,自己实现一个简化的线程池,对于帮助我们理解其内部原理还是有一些帮助的。

设计

核心思想如下:

  • 线程池实例为单例
  • 线程池实例中保存着一个线程数组,用来分发任务
  • 线程池中通过一个BlockingQueue实例,来实现FIFO的任务队列,这个实例同时被线程数组中的每一个线程拥有
  • 线程通过while循环,不断从队列中取出任务执行(Runnable

实现

首先是线程池示例SimpleThreadPool.java,使用单例模式。

public class SimpleThreadPool {
    private static SimpleThreadPool ourInstance;
    private static final int QUEUE_SIZE = 100; // todo 调整 最优的capacity
    private PoolThread[] threadArray;
    private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
    private boolean isStopped;

    public static SimpleThreadPool getInstance() {
        if (ourInstance == null) {
            ourInstance = new SimpleThreadPool();
        }
        return ourInstance;
    }

    public SimpleThreadPool initPoolSize(int size) {
        if (size < 1 || size > 10) {
            throw new RuntimeException("size must be 1~10!");
        }
        threadArray = new PoolThread[size];
        for (int i = 0; i < size; i++) {
            threadArray[i] = new PoolThread(blockingQueue);
            threadArray[i].start();
        }
        return this;
    }

    public synchronized SimpleThreadPool execute(Runnable runnable) {
        if (isStopped) {
            throw new IllegalStateException("Thread Pool is stopped!");
        }
        blockingQueue.offer(runnable);
        return this;
    }

    public synchronized void doStop() {
        for (PoolThread pt : threadArray) {
            pt.doStop();
        }
        isStopped = true;
    }

}

然后是自定义的线程池线程,PoolThread.java

是否需要在run、doStop两个方法前声明synchronized,存疑。

public class PoolThread extends Thread {
    private BlockingQueue<Runnable> blockingQueue;
    private boolean isStopped;

    public PoolThread(BlockingQueue<Runnable> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (!isStopped) {
            try {
                Runnable runnable = blockingQueue.take();
                System.out.println(getName() + " is running...");
                runnable.run();
            } catch (InterruptedException e) {
                // todo: log it
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void doStop() {
        interrupt();
        isStopped = true;
    }
}

 写个Main函数测试一下。

public class Main {
    public static void main(String[] args) {
        SimpleThreadPool.getInstance().initPoolSize(10).execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("1+1=" + (1 + 1));
            }
        }).execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("2+2=" + (2 + 2));
            }
        }).execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("3+3=" + (3 + 3));
            }
        }).execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("4+4=" + (4 + 4));
            }
        }).execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("5+5=" + (5 + 5));
            }
        })
        ;
        SimpleThreadPool.getInstance().doStop();
    }
}

示例输出:

Thread-9 is running...
1+1=2
Thread-9 is running...
2+2=4
Thread-9 is running...
3+3=6
Thread-3 is running...
4+4=8
Thread-4 is running...
5+5=10
原文地址:https://www.cnblogs.com/maozhige/p/5084302.html