1 callable和future
http://blog.csdn.net/zy_281870667/article/details/72047325
一般情况,我们实现多线程都是Thread或者Runnable(后者比较多),但是,这两种都是没返回值的,所以我们需要使用callable(有返回值的多线程)和future(获得线程的返回值)来实现了。
public class TestThread {
public static void main(String[] args) {
ThreadCount tc = null;
ExecutorService es = Executors.newCachedThreadPool();//线程池
CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es);
for(int i=0;i<4;i++){
tc = new ThreadCount(i+1);
cs.submit(tc);
}
// 添加结束,及时shutdown,不然主线程不会结束
es.shutdown();
int total = 0;
for(int i=0;i<4;i++){
try {
total+=cs.take().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println(total);
}
}
class ThreadCount implements Callable<Integer>{
private int type;
ThreadCount(int type){
this.type = type;
}
@Override
public Integer call() throws Exception {
if(type==1){
System.out.println("C盘统计大小");
return 1;
}else if(type==2){
Thread.sleep(20000);
System.out.println("D盘统计大小");
return 2;
}else if(type==3){
System.out.println("E盘统计大小");
return 3;
}else if(type==4){
System.out.println("F盘统计大小");
return 4;
}
return null;
}
}
ps:一个需要注意的小细节,cs.take.get()获取返回值(这里阻塞?),是按照完成的顺序的,即上面案例返回顺序是CEFD
tips:
ExecutorCompletionService 是将 Executor和BlockQueue结合的jdk类,其实现的主要目的是:提交任务线程,每一个线程任务直线完成后,将返回值放在阻塞队列中,然后可以通过阻塞队列的take()方法返回 对应线程的执行结果!!
2. join阻塞——直接用join把线程5加入进去即可
http://blog.csdn.net/u011971132/article/details/62444282
直接用join把线程5加入进去即可
public static void main(String[] args) throws InterruptedException
{
Thread t1 = new Thread(new Worker("thread-1"));
Thread t2 = new Thread(new Worker("thread-2"));
Thread t3 = new Thread(new Worker("thread-3"));
Thread t4 = new Thread(new Worker("thread-4"));
Thread t5 = new Thread(new Worker("thread-5"));
t1.start();t2.start();t3.start();t4.start();
t1.join();t2.join();t3.join();t4.join();
t5.start();
t5.join();
}
或者,在t5的run:
t1.start();t2.start();t3.start();t4.start(); t1.join();t2.join();t3.join();t4.join();
t5线程中等待四个线程返回
3.用java.util.concurrent下的方法解决
http://blog.csdn.net/wenwen360360/article/details/62104612
用CountDownLatch : 一个线程(或者多个),
等待另外N个线程完成某个事情之后才能执行
CountDownLatch 是计数器, 线程完成一个就记一个, 就像 报数一样, 只不过是递减的.
盗用别人的一个例子:
-
public class CountDownLatchDemo {
-
final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
public static void main(String[] args) throws InterruptedException {
-
CountDownLatch latch=new CountDownLatch(2);//两个工人的协作
-
Worker worker1=new Worker("zhang san", 5000, latch);
-
Worker worker2=new Worker("li si", 8000, latch);
-
worker1.start();//
-
worker2.start();//
-
latch.await();//等待所有工人完成工作
-
System.out.println("all work done at "+sdf.format(new Date()));
-
}
-
-
-
static class Worker extends Thread{
-
String workerName;
-
int workTime;
-
CountDownLatch latch;
-
public Worker(String workerName ,int workTime ,CountDownLatch latch){
-
this.workerName=workerName;
-
this.workTime=workTime;
-
this.latch=latch;
-
}
-
public void run(){
-
System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));
-
doWork();//工作了
-
System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));
-
latch.countDown();//工人完成工作,计数器减一
-
-
}
-
-
private void doWork(){
-
try {
-
Thread.sleep(workTime);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
-
-
}
CyclicBarrier :
N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
这样应该就清楚一点了,对于CountDownLatch来说,重点是那个“一个线程”,
是它在等待, 而另外那N的线程在把“某个事情”做完之后可以继续等待,可以终止。而对于CyclicBarrier来说,重点是那N个线程,他们之间任何一个没有完成,所有的线程都必须等待。
CyclicBarrier更像一个水闸,
线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.
http://blog.csdn.net/gongpulin/article/details/51236398
-
import java.util.concurrent.BrokenBarrierException;
-
import java.util.concurrent.CyclicBarrier;
-
public class CyclicBarrierTest {
-
public static void main(String[] args) {
-
-
-
CyclicBarrier cb = new CyclicBarrier(5, new MainTask());
-
new SubTask("A", cb).start();
-
new SubTask("B", cb).start();
-
new SubTask("C", cb).start();
-
new SubTask("D", cb).start();
-
new SubTask("E", cb).start();
-
}
-
}
-
-
-
-
-
class MainTask implements Runnable {
-
public void run() {
-
//根据jdkdoc里的描述,哪个线程最后运行完,就执行下面的代码。
-
System.out.println("......执行最后的任务了......");
-
}
-
}
-
-
-
-
-
class SubTask extends Thread {
-
private String name;
-
private CyclicBarrier cb;
-
-
SubTask(String name, CyclicBarrier cb) {
-
this.name = name;
-
this.cb = cb;
-
}
-
-
public void run() {
-
System.out.println("[并发任务" + name + "] 开始执行");
-
for (int i = 0; i < 999999; i++) ;
-
System.out.println("[并发任务" + name + "] 开始执行完毕,通知障碍器");
-
try {
-
-
cb.await();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (BrokenBarrierException e) {
-
e.printStackTrace();
-
}
-
}
-
}
结果:
[并发任务A] 开始执行
[并发任务B] 开始执行
[并发任务B] 开始执行完毕,通知障碍器
[并发任务C] 开始执行
[并发任务C] 开始执行完毕,通知障碍器
[并发任务A] 开始执行完毕,通知障碍器
[并发任务D] 开始执行
[并发任务D] 开始执行完毕,通知障碍器
[并发任务E] 开始执行
[并发任务E] 开始执行完毕,通知障碍器
......执行最后的任务了......
4. ExecutorService类提供其他版本的invokeAll()阻塞——这还是一个更详细future的例子,其机制类似于join,这里拿出来
线程执行者(六)运行多个任务并处理所有结果
http://ifeve.com/thread-executors-6/
执行者框架允许你在不用担心线程创建和执行的情况下,并发的执行任务。它还提供了Future类,这个类可以用来控制任务的状态,也可以用来获得执行者执行任务的结果。
如果你想要等待一个任务完成,你可以使用以下两种方法:
- 如果任务执行完成,Future接口的isDone()方法将返回true。——注意,是立即返回,不会阻塞等待
-
ThreadPoolExecutor类的awaitTermination()方法使线程进入睡眠,直到每一个任务调用shutdown()方法之后完成执行。
这两种方法都有一些缺点。第一个方法,你只能控制一个任务的完成。第二个方法,你必须等待一个线程来关闭执行者,否则这个方法的调用立即返回。
ThreadPoolExecutor类提供一个方法,允许你提交任务列表给执行者,并且在这个列表上等待所有任务的完成。在这个指南中,你将学习如何使用这个特性,实现一个示例,执行3个任务,并且当它们完成时将结果打印出来。
准备工作…
这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。
如何做…
按以下步骤来实现的这个例子:
1.创建Result类,存储这个示例中并发任务产生的结果。
2.声明两个私有属性。一个String属性,名为name,另一个int属性,名为value。
3.实现相应的get()和set()方法,用来设置和获取name和value属性的值。
01 |
public String
getName() { |
04 |
public void setName(String
name) { |
07 |
public int getValue()
{ |
11 |
public void setValue( int value)
{ |
4.创建Task类,实现Callable接口,参数化为Result类型。
1 |
public class Task implements Callable<Result>
{ |
5.声明一个私有String属性,名为name。
6.实现Task类构造器,初始化这个属性。
1 |
public Task(String
name) { |
7.实现这个类的call()方法,在本例中,它将返回一个Result对象。
2 |
public Result
call() throws Exception
{ |
8.首先,写入一个信息到控制台,表明任务开始。
1 |
System.out.printf( "%s:
Staring
" , this .name); |
9.然后,等待一个随机时间。
2 |
long duration=( long )(Math.random()* 10 ); |
3 |
System.out.printf( "%s:
Waiting %d seconds for results.
" , this .name,duration); |
4 |
TimeUnit.SECONDS.sleep(duration); |
5 |
} catch (InterruptedException
e) { |
10.在Result对象中返回一个计算5个随机数的总和的int值。
2 |
for ( int i= 0 ;
i< 5 ;
i++){ |
3 |
value+=( int )(Math.random()* 100 ); |
11.创建Result对象,用任务的名称和前面操作结果来初始化它。
1 |
Result
result= new Result(); |
2 |
result.setName( this .name); |
3 |
result.setValue(value); |
12.写入一个信息到控制台,表明任务已经完成。
1 |
System.out.println( this .name+ ":
Ends" ); |
13.返回Result对象。
14.最后,实现这个示例的主类,创建Main类,实现main()方法。
2 |
public static void main(String[]
args) { |
15.使用Executors类的newCachedThreadPool()方法,创建ThreadPoolExecutor对象。
1 |
ExecutorService
executor=(ExecutorService)Executors.newCachedThreadPool(); |
16.创建Task对象列表。创建3个Task对象并且用这个列表来存储。
1 |
List<Task>
taskList= new ArrayList<>(); |
2 |
for ( int i= 0 ;
i< 3 ;
i++){ |
17.创建Future对象列表,参数化为Result类型。
1 |
List<Future<Result>>resultList= null ; |
18.调用ThreadPoolExecutor类的invokeAll()方法。这个类将会返回之前创建的Future对象列表。
2 |
resultList=executor.invokeAll(taskList); |
3 |
} catch (InterruptedException
e) { |
19.使用shutdown()方法结束执行者。
20.写入处理Future对象列表任务的结果。
01 |
System.out.println( "Main:
Printing the results" ); |
02 |
for ( int i= 0 ;
i<resultList.size(); i++){ |
03 |
Future<Result>
future=resultList.get(i); |
05 |
Result
result=future.get(); |
06 |
System.out.println(result.getName()+ ":
" +result. |
08 |
} catch (InterruptedException
| ExecutionException e) { |
它是如何工作的…
在这个指南中,你已经学习了如何提交任务列表到执行者和使用invokeAll()方法等待它们的完成。这个方法接收Callable对象列表和返回 Future对象列表。这个列表将会有列表中每个任务的一个Future对象。Future对象列表的第一个对象是Callable对象列表控制的第一个任务,以此类推。
第一点要考虑的是,在存储结果对象的列表中声明的Future接口参数化的数据类型必须与使用的Callable对象的参数化相兼容。在本例中,你已经使用相同数据类型:Result类型。
另一个重要的一点就是关于invokeAll()方法,你会使用Future对象获取任务的结果。当所有的任务完成时,这个方法结束,如果你调用返回的Future对象的isDone()方法,所有调用都将返回true值。
不止这些…
ExecutorService类提供其他版本的invokeAll()方法:
- invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit):此方法执行所有任务,当它们全部完成且未超时,返回它们的执行结果。TimeUnit类是个枚举类,有如下常量:DAYS,HOURS,MICROSECONDS, MILLISECONDS, MINUTES,,NANOSECONDS
和SECONDS。
个人认为,此处用invoke方法不好,如同join一样,会阻塞线程发起方,主线程在等待返回的时候啥都做不了,但是意思是这个意思,符合题目
小结:
1.future模式
2.join阻塞
3.ExecutorService.invokeAll阻塞
4.countDownLatch
5.SyclicBarrier
6.回调-观察者模式:https://www.cnblogs.com/silyvin/p/9106633.html
7.锁 Synchronized ReetraintLock,本质与join同