java并发编程基础——线程池

线程池

由于启动一个线程要与操作系统交互,所以系统启动一个新的线程的成本是比较高的。在这种情况下,使用线程池可以很好的提升性能,特别是程序中涉及创建大量生命周期很短暂的线程时。

与数据库连接池类似,线程池在启动时就创建了大量的空闲的线程,程序将一个Runnable对象或者Callable对象传给线程池,线程池就会启动一个线程来执行他们的run()或call()方法,当方法执行结束后,线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一次执行。

线程池还可以有效的控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池中的最大线程参数可以控制系统中并发线程数不超过此数。

一、Executors工厂生成线程池

在java5开始,增加了一个Executors工厂类来生产线程池,它包含如下几个静态方法来生产线程池:

newCacheThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将会别缓存在线程池中。

newFixedThreadPool(int nThread):创建一个可重用的、具有固定线程数的线程池。

newSingleThreadExecutor():创建一个只有单线程的线程池,相当与newFixedThreadPool(1)。

newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在延迟后执行线程任务。

newSigleScheduleExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。

上面方法中,前3个返回ExecutorService对象线程池。后面两个返回ScheduleExecutorService对象线程池。ScheduleExecutorService是ExecutorService的子类,可以延迟执行线程。

线程池小例子:

package threadtest;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ThreadTest implements Runnable  {
 
    private static int i = 0;
    private synchronized void incre() {
        i++;
        System.out.println(Thread.currentThread().getName() +" "+ i);
    }
    @Override
    public void run() {
 
        for(int j=0;j<5;j++) {
            incre();
        }
    }
    public static void main(String[] args) {
 
        //生产一个线程池
        ExecutorService es = Executors.newFixedThreadPool(6);
        ThreadTest tt = new ThreadTest();
        es.submit(tt);
        es.submit(tt);
        es.submit(tt);
        es.shutdown();
    }
}

结果:

pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-2 6
pool-1-thread-2 7
pool-1-thread-2 8
pool-1-thread-2 9
pool-1-thread-2 10
pool-1-thread-3 11
pool-1-thread-3 12
pool-1-thread-3 13
pool-1-thread-3 14
pool-1-thread-3 15

二、ForkJoinPool

java7提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的计算结果合并成总的计算结果。(跟进多核cpu时代)

ForkJoinPool是ExecutorService接口的实现类,所以它是一种特殊的线程池

ForkJoinPool有两个常用的构造器:

ForkJoinPool(int parallelism):创建包含parallelism个并行线程的ForkJoinPool

ForkJoinPool():以Runtime.availableProcessors()方法的返回值做为parallelism参数来创建ForkJoinPool。

ForkJoinPool创建后就可以调用它的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法来执行指定任务了。其中ForkJoinTask代表一个可以并行、合并的任务。

ForkJoinTask是一个抽象类,它还有两个抽象子类:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任务,RecursiveAction代表无返回值的任务。

下面程序利用ForkJoinPool执行RecursiveAction任务,打印一段数字

package threadtest;
 
import java.util.concurrent.RecursiveAction;
 
/**
 * 继承RecursiveAction实现可分解的任务
 * @author rdb
 *
 */
public class PrintTask extends RecursiveAction{
 
    //每个任务对多打印50个数
    private static final int THRESHOLD = 50;
    private int start;
    private int end;
    public PrintTask(int start,int end) {
        this.start = start;
        this.end = end ;
    }
    @Override
    protected void compute() {
        //打印数小于50开始打印,否则分解任务
        if(end - start < THRESHOLD) {
            for(int i = start;i<end;i++) {
                System.out.println(Thread.currentThread().getName()+" "+ i);
            }
        }else {
            int mind = (end + start)/2;
            PrintTask left = new PrintTask(start, mind);
            PrintTask right = new PrintTask(mind, end);
            //分解任务
            left.fork();
            right.fork();
             
        }
    }
}
 
package threadtest;
 
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
 
public class ThreadTest {
 
public static void main(String[] args) throws InterruptedException {
         
        //构建ForkJoinPool
        ForkJoinPool pool = new ForkJoinPool();
        //提交任务
        pool.submit(new PrintTask(0, 300));
        pool.shutdown();
         
        //阻塞主线程,直到线程池关闭
        while(!pool.awaitTermination(2, TimeUnit.SECONDS)) {
            System.out.println("service not stop");
        }
        System.out.println("all thread complete");
    }
 
}

结果:从结果中可以看出,有个四个核在并行执行任务(电脑是4核处理器)

ForkJoinPool-1-worker-1 262
ForkJoinPool-1-worker-0 37
ForkJoinPool-1-worker-3 187
ForkJoinPool-1-worker-3 188
ForkJoinPool-1-worker-3 189
ForkJoinPool-1-worker-3 190
ForkJoinPool-1-worker-3 191
ForkJoinPool-1-worker-2 112
ForkJoinPool-1-worker-2 113
ForkJoinPool-1-worker-3 192
...
ForkJoinPool-1-worker-0 259
ForkJoinPool-1-worker-0 260
ForkJoinPool-1-worker-1 296
ForkJoinPool-1-worker-0 261
ForkJoinPool-1-worker-1 297
ForkJoinPool-1-worker-1 298
ForkJoinPool-1-worker-1 299
all thread complete

下面程序利用ForkJoinPool执行RecursiveTask任务,求数组的和并返回

package threadtest;
 
import java.util.concurrent.RecursiveTask;
/**
 * 继承RecursiveTask实现可分解的有返回值的任务
 * @author rdb
 *
 */
public class SumTask extends RecursiveTask<Integer>{
 
    //定义每个任务最多求和10个元素
        private final int THRESHOLD = 10;
        private int[] array;
        private int start;
        private int end;
        public SumTask(int[] array,int start,int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
        @Override
        protected Integer compute() {
             int sum = 0;
            if((end - start) < THRESHOLD) {
                for(int i = start;i<end;i++) {
                    sum += array[i];
                }
                return sum;
            }else {
                int mind = (end + start)/2 ;
                SumTask left = new SumTask(array, start, mind);
                SumTask right = new SumTask(array, mind, end);
                left.fork();
                right.fork();
                return left.join()+right.join();
            }
        }
 
}
 
package threadtest;
  
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
  
public class ThreadTest {
  
    public static void main(String[] args) throws Exception {
        int[] array = new int[100] ;
        int total = 0;
         
        for(int i = 0;i<array.length;i++) {
            array[i] = (int) (Math.random() * 10);
            total += array[i];
        }
        System.out.println(total);
        //JAVA8新加的通用池
        ForkJoinPool pool = ForkJoinPool.commonPool();
        Future<Integer> future = pool.submit(new SumTask(array, 0, array.length));
        System.out.println(future.get());
    }     
}

结果

437
437
原文地址:https://www.cnblogs.com/jnba/p/10636683.html