高并发编程基础(线程池基础)

线程池简单基础介绍:

Executor:

  Executor是Java工具类,执行提交给它的Runnable任务。该接口提供了一种基于任务运行机制的任务提交方法,包括线程使用详细信息,时序等等。Executor通常用于替代创建多线程。

  提供一个execute(Runnable command)方法;我们一般用它的继承接口ExecutorService。里面就只有一个执行任务的接口,源码如下:

public interface Executor {

    void execute(Runnable command);
}

ExecutorService:

  它是线程池定义的一个接口,继承自Executor。有两个实现类,分别为ThreadPoolExecutor,ScheduledThreadPoolExecutor。除了继承自父类的 execute 执行方法 ,自己还定义了一系列方法,其中有两个重载提交任务的方法 submit 方法,参数可以是 Runnable 或者是 Callable类型的。

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

  这里既然提到了Callable 那就先来看一下这了接口跟Runnable:

Callable

public interface Callable<V> {
    V call() throws Exception;
}

Runnable:

public interface Runnable {
    public abstract void run();
}

相同点:

  1. 两者都是接口;
  2. 两者都可用来编写多线程程序;
  3. 两者都需要调用Thread.start()启动线程;

不同点:

  1. 两者最大的不同点是:实现Callable接口的任务线程能返回执行结果;而实现Runnable接口的任务线程不能返回结果;
  2. Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;

注意点:

  Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞主线程直到获取‘将来’结果;当不调用此方法时,主线程不会阻塞!

Executors:

  是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池的工具类,有点类似与Arrays。Executors一共可以创建下面这四类线程池:

  1. newCacheThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3. newScheduledThreadPool 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
  4. newSingleThreadExecutor 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

ThreadPool:

  合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

  线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。用线程池来管理的好处是,可以保证系统稳定运行,适用与有大量线程,高工作量的情景下使用,假如要展示1000张图片如果创建1000个线程去加载,系统肯定会死掉。用线程池就可以避免这个问题,可以用5个线程轮流执行,5个一组,执行完的线程不直接回收而是等待下次执行,这样对系统的开销就可以减小不少。

public class T05_ThreadPool {
	public static void main(String[] args) {
		//创建一个5个线程的线程池
		ExecutorService service = Executors.newFixedThreadPool(5);
		for(int i=0;i<6;i++) {//往池子里仍了6个任务
			service.execute(()->{//睡500毫秒后打印线程名
			try {
				TimeUnit.MILLISECONDS.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName());
			});
		}
		//java.util.concurrent.ThreadPoolExecutor@119d7047[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
		System.out.println(service);
		service.shutdown();//关闭线程池 等待任务都执行完再关闭
		System.out.println(service.isTerminated());//false 任务是否都执行完
		System.out.println(service.isShutdown());//true 是不是关闭? 关闭了不代表任务执行完。
		//java.util.concurrent.ThreadPoolExecutor@119d7047[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
		System.out.println(service);
		try {
			TimeUnit.SECONDS.sleep(5);//睡5秒
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(service.isTerminated());//true
		System.out.println(service.isShutdown());//true
		//java.util.concurrent.ThreadPoolExecutor@119d7047[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
		System.out.println(service);
		
	}
}

Futrue: 

  在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承thread类还是实现runnable接口,都无法保证获取到之前的执行结果。通过实现Callback接口,并用Future可以来接收多线程的执行结果。
        Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。
        举个例子:比如去吃早点时,点了包子和凉菜,包子需要等3分钟,凉菜只需1分钟,如果是串行的一个执行,在吃上早点的时候需要等待4分钟,但是因为你在等包子的时候,可以同时准备凉菜,所以在准备凉菜的过程中,可以同时准备包子,这样只需要等待3分钟。那Future这种模式就是后面这种执行模式。
  
public class T06_Future {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		//FutureTask 区分 RunnableTask  也是实现了Runnable接口的
		FutureTask<Integer> task = new FutureTask<>(()-> {
			try {// 该任务将来会有个返回值是Integer类型  
				TimeUnit.MILLISECONDS.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return 1000;
		}); // new Callable()
		
		new Thread(task).start();
		
		System.out.println(task.get());//阻塞
		
		//************************************
		ExecutorService service = Executors.newFixedThreadPool(5);
		Future<Integer> f = service.submit(()->{// callable
			try {
				TimeUnit.MILLISECONDS.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return 1;
		});
		System.out.println(f.isDone());// 任务执行完没有啊?
		System.out.println(f.get()); // 阻塞  1
		System.out.println(f.isDone());
	}
}

ParallerComputing(并行计算)

newFixedThreadPool:(固定线程的线程池)

  测试例子:计算1-20W之间的质数数量,用一个线程的话计算时间会很长。我们可以用线程池来解决:

public class T07_ParallerComputing {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		long start = System.currentTimeMillis();
		// 获取 1-200000的质数,只能被1跟自身整除
		List<Integer> results = getPrime(1,200000);
		long end = System.currentTimeMillis();
		System.out.println(end - start);// 2000 左右
		
		final int cpuCoreNum =4;
		
		ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
		MyTask task1 = new MyTask(1, 80000);
		MyTask task2 = new MyTask(80001, 130000);
		MyTask task3 = new MyTask(130001, 170000);
		MyTask task4 = new MyTask(170001, 200000);
		
		start = System.currentTimeMillis();
		Future<List<Integer>> submit1 = service.submit(task1);
		Future<List<Integer>> submit2 = service.submit(task2);
		Future<List<Integer>> submit3 = service.submit(task3);
		Future<List<Integer>> submit4 = service.submit(task4);
		
		submit1.get();
		submit2.get();
		submit3.get();
		submit4.get();
		
		end = System.currentTimeMillis();
		System.out.println(end - start); // 800左右
	}

	static class MyTask implements Callable<List<Integer>> {
		int startPos, endPos;

		private MyTask(int startPos, int endPos) {
			this.startPos = startPos;
			this.endPos = endPos;
		}

		@Override
		public List<Integer> call() throws Exception {
			List<Integer> results = getPrime(startPos, endPos);
			return results;
		}
	}

	static boolean isPrime(int num) {// 是否是质数
		for (int i = 2; i < num / 2; i++) {
			if (num % i == 0)
				return false;
		}
		return true;
	}

	static List<Integer> getPrime(int start, int end) {
		List<Integer> results = new ArrayList<Integer>();
		for (int i = start; i < end; i++) {
			if (isPrime(i))
				results.add(i);
		}
		return results;
	}
}

  两次执行结果大致为 2067  ,698 .这说明用线程池会大大提高计算效率。

newCachedThreadPool:

  刚刚开始线程池里面没有线程,来一个任务启动一个线程,如果有空闲的线程就直接执行任务,没有空闲就另起一个线程,每个线程超过 60 秒的空闲时间,线程消失,可以自己指定生存时间。

public class T08_CachedPool {

	public static void main(String[] args) throws InterruptedException {
		ExecutorService service = Executors.newCachedThreadPool();
		//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
		System.out.println(service);
		for(int i=0;i<2;i++) {
			service.execute(()->{//睡500毫秒后打印线程名
				try {
					TimeUnit.MILLISECONDS.sleep(500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName());
				});
		}
		//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
		System.out.println(service);
		
		TimeUnit.SECONDS.sleep(80);
		
		//java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
		System.out.println(service);
	}
}

newSingleThreadExecutor:

  线程池里面只有一个线程,代码如下:

public class T09_SingleThreadPool {

	public static void main(String[] args) throws InterruptedException {
		ExecutorService service = Executors.newSingleThreadExecutor();
		for (int i = 0; i < 5; i++) {
			final int j = i;
			service.execute(() -> { 
				System.out.println(j + " " + Thread.currentThread().getName());
			});
//			0 pool-1-thread-1
//			1 pool-1-thread-1
//			2 pool-1-thread-1
//			3 pool-1-thread-1
//			4 pool-1-thread-1
		}
	}
}

  这个可以保证任务的先后执行顺序,打印出来的结果是按顺序的,且只有一个线程去执行。

newScheduledThreadPool:定时器线程池

  以下小程序是启动后0秒开始执行,每隔500毫秒执行一次任务

public class T09_SingleThreadPool {

	public static void main(String[] args) throws InterruptedException {
		  ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
		  service.scheduleAtFixedRate(()->{//以固定的频率来执行任务
			  try {
				TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			  System.out.println(Thread.currentThread().getName());
		  }, 0, 500, TimeUnit.MILLISECONDS);// 起始延迟多久后执行 ,每隔500毫秒执行一次,时间单位
	}
}

newWorkStealingPool:精灵线程(守护线程,后台线程)

  任务窃取:线程池中每个线程都维护着自己的任务队列,当某一个线程队列执行空了,他会去另外的线程中去拿一个任务来执行,不用去分配:

public class T11_WorkStealingPool {

	public static void main(String[] args) throws InterruptedException, IOException {
		ExecutorService service = Executors.newWorkStealingPool();
		//查看CPU是几核 我这里是8  默认启动8个线程java.util.concurrent.ForkJoinPool@55f96302[Running, parallelism = 8, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
		System.out.println(Runtime.getRuntime().availableProcessors());
		//java.util.concurrent.ForkJoinPool@55f96302[Running, parallelism = 8, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
		System.out.println(service);
		
		service.execute(new R(1000));
		service.execute(new R(1000));
		service.execute(new R(1000));
		service.execute(new R(1000));//daemon 精灵线程Debug 可以查看
		service.execute(new R(1000));
		
		//由于产生的是精灵线程(守护线程,后台线程),主线程不阻塞的话看不到输出
		System.in.read();
	}
	static class R implements Runnable{
		
		int time;
		
		public R(int time) {
			this.time =time;
		}

		@Override
		public void run() {
			try {
				TimeUnit.MILLISECONDS.sleep(time);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			  System.out.println(time +" "+ Thread.currentThread().getName());
		}
	}
}

   newWorkStealingPool的实现是ForkJoinPool;

ForkJoinPool :

  ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

  下面例子是从 0 加到 100W .:

public class T12_ForkJoinPool {
    
    static int[] nums= new int[1000000];
    static final int MAX_NUM =50000;
    static Random r=new Random();
    
    static {
        for(int i=0;i<nums.length;i++) {
            nums[i] = r.nextInt(100);
        }
        System.out.println(Arrays.stream(nums).sum());
    }
    //无返回值
    static class AddTask extends RecursiveAction{
        int start, end;

        private AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        @Override
        protected void compute() {
            if(end -start <MAX_NUM) {
                long sum =0L;
                for(int i=start;i<end ;i++) sum += nums[i];
                System.out.println("from "+start +" to "+end+" = "+sum);
            }else {
                int middle =start +(end-start)/2;
                AddTask task1 = new AddTask(start, middle);
                AddTask task2 = new AddTask(middle, end);
                task1.fork();
                task2.fork();
            }
            
        }
        
    }
        //有返回值
    static class AddTask2 extends RecursiveTask<Long>{
        int start, end;
        
        private AddTask2(int start, int end) {
            this.start = start;
            this.end = end;
        }
        @Override
        protected Long compute() {
            if(end -start <MAX_NUM) {
                long sum =0L;
                for(int i=start;i<end ;i++) sum += nums[i];
                return sum;
            } 
                int middle =start +(end-start)/2;
                AddTask2 task1 = new AddTask2(start, middle);
                AddTask2 task2 = new AddTask2(middle, end);
                task1.fork();//启动新线程
                task2.fork();
                return task1.join() + task2.join();
            
        }
        
    }
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        
//        AddTask task = new AddTask(0, nums.length);
//        fjp.execute(task);
        AddTask2 task2 = new AddTask2(0, nums.length);
        fjp.execute(task2);
        long result= task2.join();//阻塞的
        System.out.println(result);
        System.in.read();
    }

}

ThreadPoolExecutor:

  在上诉讲到的线程池中 :newFixedThreadPool ,newCachedThreadPool,newSingleThreadExecutor,newScheduledThreadPool 的底层实现中都是用到了 ThreadPoolExecutor 来创建线程池。而newWorkStealingPool 的底层用的是 ForkJoinPool ,ForkJoinPool是1.8以后才加入的。

简单看一下几种线程池构造函数的简单实现:

newFixedThreadPool :new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); 指定的线程数,最大线程数,多长时间空闲消失,时间单位,队列。 0L代表永远不会消失。

newCachedThreadPool:new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

newSingleThreadExecutor:new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())

newScheduledThreadPool: super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());

newWorkStealingPool:new ForkJoinPool (Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true)第一个是CPU核数

 

原文地址:https://www.cnblogs.com/wuzhenzhao/p/9928639.html