读书笔记-java并发编程-核心方法与框架1-5

  • Semaphore的使用
    单词Semaphore[' seməf :(r)]的中文含义是信号、信号系统。此类的主要作用就是限制线程并发的数量,如果不限制线程并发的数量,则CPU的资源很快就被耗尽,每个线程执行的任务是相当缓慢,因为CPU要把时间片分配给不同的线程对象,而且上下文切换也要耗时,最终造成系统运行效率大幅降低,所以限制并发线程的数量还是非常有必要的
public class Service {
    private Semaphore semaphore = new Semaphore(1);

    public void testMethod() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()
                    + " begin timer=" + System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName()
                    + "   end timer=" + System.currentTimeMillis());
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 类Semaphore的构造函数参数permits是许可的意思,代表同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码
  • 其实还可以传入>1的许可,代表同一时间内,最多允许有x个线程可以执行acquire()和release()之间的代码。
  • 类Semaphore所提供的功能完全就是synchronized关键字的升级版
  • 当Semaphore的数量大于1时将不能保证线程安全
  • Semaphore的数量可以动态增加,当release的数量大于初始化数量时会自动增加:
    public static void main(String[] args) {
        try {
            Semaphore semaphore = new Semaphore(5);
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            semaphore.acquire();
            System.out.println(semaphore.availablePermits());
            semaphore.release();
            semaphore.release();
            semaphore.release();
            semaphore.release();
            semaphore.release();
            semaphore.release();
            System.out.println(semaphore.availablePermits());
            semaphore.release(4);
            System.out.println(semaphore.availablePermits());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
结果:
0
6
10

  • 方法acquireUninterruptibly()的作用是使等待进入acquire()方法的线程,不允许被中断。
  • 当调用interrupt方法时,在执行的线程不会被打断,仍然会执行完成
  • availablePermits()返回此Semaphore对象中当前可用的许可数,此方法通常用于调试,因为许可的数量有可能实时在改变,并不是固定的数量。drainPermits()可获取并返回立即可用的所有许可个数,并且将可用许可置0。
  • 方法getQueueLength()的作用是取得等待许可的线程个数。方法hasQueuedThreads()的作用是判断有没有线程在等待这个许可。这两个方法通常都是在判断当前有没有等待许可的线程信息时使用
public void testMethod() {
		try {
			semaphore.acquire();
			Thread.sleep(1000);
			System.out.println("还有大约" + semaphore.getQueueLength() + "个线程在等待");
			System.out.println("是否有线程正在等待信号量呢?" + semaphore.hasQueuedThreads());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			semaphore.release();
		}
	}
  • 公平与非公平信号量: 公平信号量是获得锁的顺序与线程启动的顺序有关,但不代表100%地获得信号量,仅仅是在概率上能得到保证。而非公平信号量就是无关的了。
  • tryAcquire(): 无参方法tryAcquire()的作用是尝试地获得1个许可,如果获取不到则返回false,此方法通常与if语句结合使用,其具有无阻塞的特点。无阻塞的特点可以使线程不至于在同步处一直持续等待的状态,如果if语句判断不成立则线程会继续走else语句,程序会继续向下运行
public class Service {

	private Semaphore semaphore = new Semaphore(1);

	public void testMethod() {
		if (semaphore.tryAcquire()) {
			System.out.println("ThreadName=" + Thread.currentThread().getName()
					+ "首选进入!");
			for (int i = 0; i < Integer.MAX_VALUE; i++) {
				String newString = new String();
				Math.random();
			}
			semaphore.release();
		} else {
			System.out.println("ThreadName=" + Thread.currentThread().getName()
					+ "未成功进入!");
		}
	}
}
  • 有参方法tryAcquire(int permits)的作用是尝试地获得x个许可,如果获取不到则返回false
  • 有参方法tryAcquire(int long timeout, TimeUnit unit)的作用是在指定的时间内尝试地获得1个许可,如果获取不到则返回false。
  • 有参方法tryAcquire(int permits, long timeout, TimeUnit unit)的作用是在指定的时间内尝试地获得x个许可,如果获取不到则返回false。
  • 使用Semaphore创建字符串池
  • 类Semaphore可以有效地对并发执行任务的线程数量进行限制,这种功能可以应用在pool池技术中,可以设置同时访问pool池中数据的线程数量。本实验的功能是同时有若干个线程可以访问池中的数据,但同时只有一个线程可以取得数据,使用完毕后再放回池中。
public class ListPool {

	private int poolMaxSize = 5;
	private int semaphorePermits = 5;
	private List<String> list = new ArrayList<String>();
	private Semaphore concurrencySemaphore = new Semaphore(semaphorePermits);
	private ReentrantLock lock = new ReentrantLock();
	private Condition condition = lock.newCondition();

	public ListPool() {
		super();
		for (int i = 0; i < poolMaxSize; i++) {
			list.add("高洪岩" + (i + 1));
		}
	}

	public String get() {
		String getString = null;
		try {
			concurrencySemaphore.acquire();
			lock.lock();
			while (list.size() == 0) {
				condition.await();
			}
			getString = list.remove(0);
			lock.unlock();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return getString;
	}

	public void put(String stringValue) {
		lock.lock();
		list.add(stringValue);
		condition.signalAll();
		lock.unlock();
		concurrencySemaphore.release();
	}

}
  • 使用Semaphore实现多生产者/多消费者模式
public class RepastService {

	volatile private Semaphore setSemaphore = new Semaphore(10);// 厨师
	volatile private Semaphore getSemaphore = new Semaphore(20);// 就餐者
	volatile private ReentrantLock lock = new ReentrantLock();
	volatile private Condition setCondition = lock.newCondition();
	volatile private Condition getCondition = lock.newCondition();
	volatile private Object[] producePosition = new Object[4];

	private boolean isEmpty() {
		boolean isEmpty = true;
		for (int i = 0; i < producePosition.length; i++) {
			if (producePosition[i] != null) {
				isEmpty = false;
				break;
			}
		}
		if (isEmpty == true) {
			return true;
		} else {
			return false;
		}
	}

	private boolean isFull() {
		boolean isFull = true;
		for (int i = 0; i < producePosition.length; i++) {
			if (producePosition[i] == null) {
				isFull = false;
				break;
			}
		}
		return isFull;
	}

	public void set() {
		try {
			// System.out.println("set");
			setSemaphore.acquire();// 允许同时最多有10个厨师进行生产
			lock.lock();
			while (isFull()) {
				// System.out.println("生产者在等待");
				setCondition.await();
			}
			for (int i = 0; i < producePosition.length; i++) {
				if (producePosition[i] == null) {
					producePosition[i] = "数据";
					System.out.println(Thread.currentThread().getName()
							+ " 生产了 " + producePosition[i]);
					break;
				}
			}
			getCondition.signalAll();
			lock.unlock();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			setSemaphore.release();
		}
	}

	public void get() {
		try {
			// System.out.println("get");
			getSemaphore.acquire();// 允许同时最多有16个就餐者
			lock.lock();
			while (isEmpty()) {
				// System.out.println("消费者在等待");
				getCondition.await();
			}
			for (int i = 0; i < producePosition.length; i++) {
				if (producePosition[i] != null) {
					System.out.println(Thread.currentThread().getName()
							+ " 消费了 " + producePosition[i]);
					producePosition[i] = null;
					break;
				}
			}
			setCondition.signalAll();
			lock.unlock();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			getSemaphore.release();
		}
	}

}
  • 类Exchanger的功能可以使2个线程之间传输数据,它比生产者/消费者模式使用的wait/notify要更加方便。
  • 类Exchanger中的exchange()方法具有阻塞的特色,也就是此方法被调用后等待其他线程来取得数据,如果没有其他线程取得数据,则一直阻塞等待。
  • 当调用exchange(V x, long timeout, TimeUnit unit)方法后在指定的时间内没有其他线程获取数据,则出现超时异常

第二章

  • 类CountDownLatch也是一个同步功能的辅助类,使用效果是给定一个计数,当使用这个CountDownLatch类的线程判断计数不为0时,则呈wait状态,如果为0时则继续运行
  • 实现等待与继续运行的效果分别需要使用await()和countDown()方法来进行。调用await()方法时判断计数是否为0,如果不为0则呈等待状态。其他线程可以调用count-Down()方法将计数减1,当计数减到为0时,呈等待的线程继续运行。而方法getCount()就是获得当前的计数个数。
public class MyService {

	private CountDownLatch down = new CountDownLatch(1);

	public void testMethod() {
		try {
			System.out.println("A");
			down.await();
			System.out.println("B");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void downMethod() {
		System.out.println("X");
		down.countDown();
	}

}

	public static void main(String[] args) throws InterruptedException {
		MyService service = new MyService();
		MyThread t = new MyThread(service);
		t.start();
		Thread.sleep(2000);
		service.downMethod();
	}
只有执行了downMethod方法后,阻塞得以解除
  • 裁判在等全部的运动员到来:多个线程与同步点间阻塞的特性,线程必须都到达同步点后才可以继续向下运行
public class MyThread extends Thread {

	private CountDownLatch maxRuner;

	public MyThread(CountDownLatch maxRuner) {
		super();
		this.maxRuner = maxRuner;
	}

	@Override
	public void run() {
		try {
			Thread.sleep(20000);
			maxRuner.countDown();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

public static void main(String[] args) throws InterruptedException {
		CountDownLatch maxRuner = new CountDownLatch(10);
		MyThread[] tArray = new MyThread[Integer.parseInt(""
				+ maxRuner.getCount())];
		for (int i = 0; i < tArray.length; i++) {
			tArray[i] = new MyThread(maxRuner);
			tArray[i].setName("线程" + (i + 1));
			tArray[i].start();
		}
		maxRuner.await();
		System.out.println("都回来了!");
	}
  • 各就各位准备比赛
public class MyService {

	private CountDownLatch down = new CountDownLatch(1);

	public void testMethod() {
		try {
			System.out.println(Thread.currentThread().getName() + "准备");
			down.await();
			System.out.println(Thread.currentThread().getName() + "结束");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public void downMethod() {
		System.out.println("开始");
		down.countDown();
	}

}

public class Run {
	public static void main(String[] args) throws InterruptedException {
		MyService service = new MyService();
		MyThread[] tArray = new MyThread[10];
		for (int i = 0; i < tArray.length; i++) {
			tArray[i] = new MyThread(service);
			tArray[i].setName("线程" + (i + 1));
			tArray[i].start();
		}
		Thread.sleep(2000);
		service.downMethod();
	}
}
存在的问题:如果有的运动员2秒内没有准备好,仍然会开始
  • 改进:
public class MyThread extends Thread {

	private CountDownLatch comingTag;// 裁判等待所有运动员到来
	private CountDownLatch waitTag;// 等待裁判说准备开始
	private CountDownLatch waitRunTag;// 等待起跑
	private CountDownLatch beginTag;// 起跑
	private CountDownLatch endTag;// 所有运动员到达终点

	public MyThread(CountDownLatch comingTag, CountDownLatch waitTag,
			CountDownLatch waitRunTag, CountDownLatch beginTag,
			CountDownLatch endTag) {
		super();
		this.comingTag = comingTag;
		this.waitTag = waitTag;
		this.waitRunTag = waitRunTag;
		this.beginTag = beginTag;
		this.endTag = endTag;
	}

	@Override
	public void run() {
		try {
			System.out.println("运动员使用不同交通工具不同速度到达起跑点,正向这头走!");
			Thread.sleep((int) (Math.random() * 10000));
			System.out.println(Thread.currentThread().getName() + "到起跑点了!");
			comingTag.countDown();
			System.out.println("等待裁判说准备!");
			waitTag.await();
			System.out.println("各就各位!准备起跑姿势!");
			Thread.sleep((int) (Math.random() * 10000));
			waitRunTag.countDown();
			beginTag.await();
			System.out.println(Thread.currentThread().getName()
					+ " 运行员起跑 并且跑赛过程用时不确定");
			Thread.sleep((int) (Math.random() * 10000));
			endTag.countDown();
			System.out.println(Thread.currentThread().getName() + " 运行员到达终点");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

public static void main(String[] args) {
		try {
			CountDownLatch comingTag = new CountDownLatch(10);
			CountDownLatch waitTag = new CountDownLatch(1);
			CountDownLatch waitRunTag = new CountDownLatch(10);
			CountDownLatch beginTag = new CountDownLatch(1);
			CountDownLatch endTag = new CountDownLatch(10);

			MyThread[] threadArray = new MyThread[10];
			for (int i = 0; i < threadArray.length; i++) {
				threadArray[i] = new MyThread(comingTag, waitTag, waitRunTag,
						beginTag, endTag);
				threadArray[i].start();
			}
			System.out.println("裁判员在等待选手的到来!");
			comingTag.await();
			System.out.println("裁判看到所有运动员来了,各就各位前“巡视”用时5秒");
			Thread.sleep(5000);
			waitTag.countDown();
			System.out.println("各就各位!");
			waitRunTag.await();
			Thread.sleep(2000);
			System.out.println("发令枪响起!");
			beginTag.countDown();
			endTag.await();
			System.out.println("所有运动员到达,统计比赛名次!");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}
  • 方法await(long timeout, TimeUnit unit)的作用使线程在指定的最大时间单位内进入WAITING状态,如果超过这个时间则自动唤醒,程序继续向下运行。参数timeout是等待的时间,而unit参数是时间的单位
  • 类CyclicBarrier不仅有CountDownLatch所具有的功能,还可以实现屏障等待的功能,也就是阶段性同步,它在使用上的意义在于可以循环地实现线程要一起做任务的目标,而不是像类CountDownLatch一样,仅仅支持一次线程与同步点阻塞的特性
  • 类CyclicBarrier和Semaphore及CountDown-Latch一样,也是一个同步辅助类。它允许一组线程互相等待,直到到达某个公共屏障点(commonbarrier point),这些线程必须实时地互相等待,这种情况下就可以使用CyclicBarrier类来方便地实现这样的功能。CyclicBarrier类的公共屏障点可以重用,所以类的名称中有“cyclic循环”的单词。
    1)CountDownLatch作用:一个线程或者多个线程,等待另外一个线程或多个线程完成某个事情之后才能继续执行。
    2)CyclicBarrier的作用:多个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待,所以对于CyclicBarrier来说,重点是“多个线程之间”任何一个线程没有完成任务,则所有的线程都必须等待
  • 所有线程都到达同步点时再继续运行
public class MyThread extends Thread {

	private CyclicBarrier cbRef;

	public MyThread(CyclicBarrier cbRef) {
		super();
		this.cbRef = cbRef;
	}

	@Override
	public void run() {
		try {
			Thread.sleep((int) (Math.random() * 1000));
			System.out.println(Thread.currentThread().getName() + " 到了! "
					+ System.currentTimeMillis());
			cbRef.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

public static void main(String[] args) {
		CyclicBarrier cbRef = new CyclicBarrier(5, new Runnable() {
			public void run() {
				System.out.println("全都到了!");
			}
		});
		MyThread[] threadArray = new MyThread[5];
		for (int i = 0; i < threadArray.length; i++) {
			threadArray[i] = new MyThread(cbRef);
		}
		for (int i = 0; i < threadArray.length; i++) {
			threadArray[i].start();
		}
	}

  • 线程个数大于parties数量时分批处理
public class ThreadA extends Thread {

	private CyclicBarrier cbRef;

	public ThreadA(CyclicBarrier cbRef) {
		super();
		this.cbRef = cbRef;
	}

	@Override
	public void run() {
		try {
			System.out.println(Thread.currentThread().getName() + " begin ="
					+ System.currentTimeMillis() + " 等待凑齐2个继续运行");
			cbRef.await();
			System.out.println(Thread.currentThread().getName() + "   end ="
					+ System.currentTimeMillis() + " 已经凑齐2个继续运行");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}

	}

}

	public static void main(String[] args) throws InterruptedException {
		CyclicBarrier cbRef = new CyclicBarrier(2, new Runnable() {
			@Override
			public void run() {
				System.out.println("全来了!");
			}
		});

		for (int i = 0; i < 4; i++) {
			ThreadA threadA1 = new ThreadA(cbRef);
			threadA1.start();
			Thread.sleep(2000);
		}
	}
  • 类CyclicBarrier具有屏障重置性
public class ThreadA extends Thread {

	private CyclicBarrier cbRef;

	public ThreadA(CyclicBarrier cbRef) {
		super();
		this.cbRef = cbRef;
	}

	@Override
	public void run() {
		try {
			cbRef.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}

	}

}

	public static void main(String[] args) throws InterruptedException {
		CyclicBarrier cbRef = new CyclicBarrier(2);

		ThreadA threadA1 = new ThreadA(cbRef);
		threadA1.start();
		Thread.sleep(500);
		System.out.println(cbRef.getNumberWaiting());

		ThreadA threadA2 = new ThreadA(cbRef);
		threadA2.start();
		Thread.sleep(500);
		System.out.println(cbRef.getNumberWaiting());

		ThreadA threadA3 = new ThreadA(cbRef);
		threadA3.start();
		Thread.sleep(500);
		System.out.println(cbRef.getNumberWaiting());

		ThreadA threadA4 = new ThreadA(cbRef);
		threadA4.start();
		Thread.sleep(500);
		System.out.println(cbRef.getNumberWaiting());

	}

  • 方法reset()的作用是重置屏障
public class MyService {

	public CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
		@Override
		public void run() {
			System.out.println("                        彻底结束了 "
					+ System.currentTimeMillis());
		}
	});

	public void testMethod() {
		try {
			System.out.println(Thread.currentThread().getName() + " 准备!"
					+ System.currentTimeMillis());
			cyclicBarrier.await();
			System.out.println(Thread.currentThread().getName() + " 结束!"
					+ System.currentTimeMillis());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}

}

	public static void main(String[] args) throws InterruptedException {
		MyService service = new MyService();
		MyThreadA a = new MyThreadA(service);
		a.setName("A");
		MyThreadB b = new MyThreadB(service);
		b.setName("B");
		// 线程C未实例化
		a.start();
		b.start();

		Thread.sleep(2000);
		service.cyclicBarrier.reset();

	}

第三章 Phaser的使用

  • 方法arriveAndAwaitAdvance()的作用与CountDownLatch类中的await()方法大体一样,通过从方法的名称解释来看,arrive是到达的意思,wait是等待的意思,而advance是前进、促进的意思,所以执行这个方法的作用就是当前线程已经到达屏障,在此等待一段时间,等条件满足后继续向下一个屏障继续执行。
public class PrintTools {

	public static Phaser phaser;

	public static void methodA() {
		System.out.println(Thread.currentThread().getName() + " A1 begin="
				+ System.currentTimeMillis());
		phaser.arriveAndAwaitAdvance();
		System.out.println(Thread.currentThread().getName() + " A1   end="
				+ System.currentTimeMillis());

		System.out.println(Thread.currentThread().getName() + " A2 begin="
				+ System.currentTimeMillis());
		phaser.arriveAndAwaitAdvance();
		System.out.println(Thread.currentThread().getName() + " A2   end="
				+ System.currentTimeMillis());
	}

	public static void methodB() {
		try {
			System.out.println(Thread.currentThread().getName() + " A1 begin="
					+ System.currentTimeMillis());
			Thread.sleep(5000);
			phaser.arriveAndAwaitAdvance();
			System.out.println(Thread.currentThread().getName() + " A1   end="
					+ System.currentTimeMillis());

			System.out.println(Thread.currentThread().getName() + " A2 begin="
					+ System.currentTimeMillis());
			Thread.sleep(5000);
			phaser.arriveAndAwaitAdvance();
			System.out.println(Thread.currentThread().getName() + " A2   end="
					+ System.currentTimeMillis());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

public static void main(String[] args) {
		Phaser phaser = new Phaser(3);
		PrintTools.phaser = phaser;

		ThreadA a = new ThreadA(phaser);
		a.setName("A");
		a.start();

		ThreadB b = new ThreadB(phaser);
		b.setName("B");
		b.start();

		ThreadC c = new ThreadC(phaser);
		c.setName("C");
		c.start();
	}
因为C调用methodB执行时间比较长,所以AB都要等待C执行到arriveAndAwaitAdvance()时才能凑够3个,继续往下执行
  • 方法arriveAndDeregister()的作用是使当前线程(运动员)退出比赛,并且使parties值减1
public class PrintTools {

	public static Phaser phaser;

	public static void methodA() {
		System.out.println(Thread.currentThread().getName() + " A1 begin="
				+ System.currentTimeMillis());
		phaser.arriveAndAwaitAdvance();
		System.out.println(Thread.currentThread().getName() + " A1   end="
				+ System.currentTimeMillis());

		System.out.println(Thread.currentThread().getName() + " A2 begin="
				+ System.currentTimeMillis());
		phaser.arriveAndAwaitAdvance();
		System.out.println(Thread.currentThread().getName() + " A2   end="
				+ System.currentTimeMillis());
	}

	public static void methodB() {
		try {
			System.out.println(Thread.currentThread().getName() + " A1 begin="
					+ System.currentTimeMillis());
			Thread.sleep(5000);
			System.out.println("A:" + phaser.getRegisteredParties());
			phaser.arriveAndDeregister();
			System.out.println("B:" + phaser.getRegisteredParties());
			System.out.println(Thread.currentThread().getName() + " A1   end="
					+ System.currentTimeMillis());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}


第四章 Executor与ThreadPoolExecutor的使用

  • Executor与ThreadPoolExecutor的使用
  • 使用Executors类的newCachedThreadPool()方法创建的是无界线程池,可以进行线程自动回收。所谓的“无界线程池”就是池中存放线程个数是理论上的Integer.MAX_VALUE最大值。
  • 使用newCachedThreadPool (ThreadFactory)定制线程工厂
public class MyThreadFactory implements ThreadFactory {

	public Thread newThread(Runnable r) {
		Thread thread = new Thread(r);
		thread.setName("定制池中的线程对象的名称" + Math.random());
		return thread;
	}
}

public static void main(String[] args) {
		MyThreadFactory threadFactory = new MyThreadFactory();
		ExecutorService executorService = Executors
				.newCachedThreadPool(threadFactory);
		executorService.execute(new Runnable() {
			public void run() {
				System.out.println("我在运行" + System.currentTimeMillis() + " "
						+ Thread.currentThread().getName());
			}
		});
	}

  • 使用newFixedThreadPool(int)方法创建有界线程池
  • 使用newFixedThreadPool(int, ThreadFactory)定制线程工厂
public class MyThreadFactory implements ThreadFactory {

	public Thread newThread(Runnable r) {
		Thread thread = new Thread(r);
		thread.setName("定制池中的线程对象的名称" + Math.random());
		return thread;
	}
}

public class Run {
	public static void main(String[] args) {
		MyThreadFactory threadFactory = new MyThreadFactory();
		ExecutorService executorService = Executors.newFixedThreadPool(2,
				threadFactory);
		Runnable runnable = new Runnable() {
			public void run() {
				try {
					System.out.println("begin 我在运行"
							+ System.currentTimeMillis() + " "
							+ Thread.currentThread().getName());
					Thread.sleep(3000);
					System.out.println("  end 我在运行"
							+ System.currentTimeMillis() + " "
							+ Thread.currentThread().getName());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
		executorService.execute(runnable);
		executorService.execute(runnable);
		executorService.execute(runnable);
	}
}
  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, longkeepAliveTime, TimeUnit unit, BlockingQueueworkQueue)
    参数解释如下:
    ❑ corePoolSize:池中所保存的线程数,包括空闲线程,也就是核心池的大小。
    ❑ maximumPoolSize:池中允许的最大线程数。
    ❑ keepAliveTime:当线程数量大于corePoolSize值时,在没有超过指定的时间内是不从线程池中将空闲线程删除的,如果超过此时间单位,则删除。
    ❑ unit:keepAliveTime参数的时间单位。
    ❑ workQueue:执行前用于保持任务的队列。此队列仅保持由execute方法提交的Runnable任务。

  • BlockingQueue只是一个接口,常用的实现类有LinkedBlockingQueue和ArrayBlocking-Queue。用LinkedBlockingQueue的好处在于没有大小限制,优点是队列容量非常大,所以执行execute()不会抛出异常,而线程池中运行的线程数也永远不会超过corePoolSize值,因为其他多余的线程被放入LinkedBlockingQueue队列中,keepAliveTime参数也就没有意义了。

为了更好地理解这些参数在使用上的一些关系,可以将它们进行详细化的注释:
1)A代表execute(runnable)欲执行的runnable的数量;
2)B代表corePoolSize;
3)C代表maximumPoolSize;
4)D代表A-B(假设A>=B);
5)E代表new LinkedBlockingDeque();队列,无构造参数;
6)F代表SynchronousQueue队列;
7)G代表keepAliveTime。
构造方法中5个参数之间都有使用上的关系,
在使用线程池的过程中大部分会出现如下5种过程:
如果A<=B,那么马上创建线程运行这个任务,并不放入扩展队列Queue中,其他参数功能忽略;
如果A>B&&A<=C&&E,则C和G参数忽略,并把D放入E中等待被执行;
如果A>B&&A<=C&&F,则C和G参数有效,并且马上创建线程运行这些任务,而不把D放入F中,D执行完任务后在指定时间后发生超时时将D进行清除;
如果A>B&&A>C&&E,则C和G参数忽略,并把D放入E中等待被执行;
如果A>B&&A>C&&F,则处理C的任务,其他任务则不再处理抛出异常。
链表队列和同步队列的区别:
1.当任务数小于核心线程数,两者表现相同
2.当任务数大于核心线程数,小于最大线程数,链表队列会将多出的任务放到队列,同步队列会创建新的线程执行,不放入队列
3.当任务数大于最大线程数,链表队列将大于核心线程数的任务放到队列,同步队列只会处理最大线程数的任务,其余任务跑出异常

  • 方法shutdown()和shutdownNow()与返回值

方法shutdown()的作用是使当前未执行完的线程继续执行,而不再添加新的任务Task,还有shutdown()方法不会阻塞,调用shutdown()方法后,主线程main就马上结束了,而线程池会继续运行直>到所有任务执行完才会停止。如果不调用shutdown()方法,那么线程池会一直保持下去,以便随时执行被添加的新Task任务。方法shutdownNow()的作用是中断所有的任务Task,并且抛出>InterruptedException异常,前提是在Runnable中使用if (Thread.currentThread().isInterrupted() == true)语句来判断当前线程的中断状态,而未执行的线程不再执行,也就是从执行队>列中清除。如果没有if(Thread.currentThread().isInterrupted() == true)语句及抛出异常的代码,则池中正在运行的线程直到执行完毕,而未执行的线程不再执行,也从执行队列中清除。

  • 方法isShutdown()的作用是判断线程池是否已经关闭。

  • 方法set/getRejectedExecutionHandler()

public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		System.out.println(((MyRunnable1) r).getUsername() + " 被拒绝执行");
	}
}

public static void main(String[] args) throws InterruptedException {
		MyRunnable1 myRunnable1 = new MyRunnable1("中国1");
		MyRunnable1 myRunnable2 = new MyRunnable1("中国2");
		MyRunnable1 myRunnable3 = new MyRunnable1("中国3");
		MyRunnable1 myRunnable4 = new MyRunnable1("中国4");

		ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 9999L,
				TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
		pool.setRejectedExecutionHandler(new MyRejectedExecutionHandler());
		pool.execute(myRunnable1);
		pool.execute(myRunnable2);
		pool.execute(myRunnable3);
		pool.execute(myRunnable4);
	}

线程池中的资源全部被占用的时候,对新添加的Task任务有不同的处理策略,
在默认的情况下,ThreadPoolExecutor类中有4种不同的处理方式:
❑ AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出RejectedExecutionException异常。
❑ CallerRunsPolicy:当任务添加到线程池中被拒绝时,会使用调用线程池的Thread线程对象处理被拒绝的任务。
❑ DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
❑ DiscardPolicy:当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

  • 方法afterExecute()和beforeExecute():在线程池ThreadPoolExecutor类中重写这两个方法可以对线程池中执行的线程对象实现监控
  • 方法getActiveCount()的作用是取得有多少个线程正在执行任务
  • 方法getCompletedTaskCount ()的作用是取得有多少个线程已经执行完任务了
  • 方法getMaximumPoolSize ()的作用是取得构造方法传入的maximumPoolSize参数值
  • 方法getPoolSize ()的作用是取得池中有多少个线程
  • 方法getTaskCount ()的作用是取得有多少个任务发送给了线程池

第五章 Future和Callable的使用

接口Callable与线程功能密不可分,但和Runnable的主要区别为:

  1. Callable接口的call()方法可以有返回值,而Runnable接口的run()方法没有返回值。
  2. Callable接口的call()方法可以声明抛出异常,而Runnable接口的run()方法不可以声明抛出异常。
  3. 执行完Callable接口中的任务后,返回值是通过Future接口进行获得的。
  • 方法submit()不仅可以传入Callable对象,也可以传入Runnable对象,说明submit()方法支持有返回值和无返回值的功能。
  • 方法get(long timeout, TimeUnit unit)的作用是在指定的最大时间内等待获得返回值,如果超时会抛出异常
  • 接口RejectedExecutionHandler的主要作用是当线程池关闭后依然有任务要执行时,可以实现一些处理。
  • execute和submit区别: 方法execute()没有返回值,而submit()方法可以有返回值。方法execute()在默认的情况下异常直接抛出,不能捕获,但可以通过自定义Thread-Factory的方式进行捕获,而submit()方法在默认的情况下,可以catch Execution-Exception捕获异常。

Callable接口与Runnable接口在对比时主要的优点是,Callable接口可以通过Future取得返回值。但需要注意的是,Future接口调用get()方法取得处理的结果值时是阻塞性的,也就是如果调用Future对象的get()方法时,任务尚未执行完成,则调用get()方法时一直阻塞到此任务完成时为止。如果是这样的效果,则前面先执行的任务一旦耗时很多,则后面的任务调用get()方法就呈阻塞状态,也就是排队进行等待,大大影响运行效率。也就是主线程并不能保证首先获得的是最先完成任务的返回值,这就是Future的缺点,影响效率

原文地址:https://www.cnblogs.com/Baronboy/p/14056735.html