CompletableFuture用法介绍

一、CompletableFuture用法入门介绍

入门介绍的一个例子:

 1 package com.cy.java8;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.CompletableFuture;
 5 
 6 public class CompletableFutureInAction {
 7     private final static Random RANDOM = new Random(System.currentTimeMillis());
 8 
 9     public static void main(String[] args){
10         CompletableFuture<Double> completableFuture = new CompletableFuture<>();
11         new Thread(() -> {
12             double value = get();
13             completableFuture.complete(value);
14         }).start();
15 
16         System.out.println("do other things...");
17 
18         completableFuture.whenComplete((t, e) -> {
19             System.out.println("complete. value = "+ t);
20             if(e != null){
21                 e.printStackTrace();
22             }
23         });
24     }
25 
26     private static double get(){
27         try {
28             Thread.sleep(RANDOM.nextInt(3000));
29         } catch (InterruptedException e) {
30             e.printStackTrace();
31         }
32         return RANDOM.nextDouble();
33     }
34 }

console打印:

do other things...
complete. value = 0.8244376567363494

 

二、CompletableFuture.supplyAsync

CompletableFuture很少有直接new出来的方式去用的,一般都是通过提供的静态方法来使用。

1.使用CompletableFuture.supplyAsync来构造CompletableFuture:

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.*;
 4 
 5 import static com.cy.java8.CompletableFutureInAction.get;
 6 
 7 public class CompletableFutureInAction2 {
 8 
 9     public static void main(String[] args) {
10         /**
11          * 可以发现value=..没有被打印,为什么呢?
12          * 因为此方法构造出来的线程是demon的,守护进程,main执行结束之后就消失了,所以
13          * 根本没来得及执行whenComplete中的语句
14          */
15         CompletableFuture.supplyAsync(() -> get())
16                 .whenComplete((v, e) -> {
17                     System.out.println("value = " + v);
18                     if (e != null) {
19                         e.printStackTrace();
20                     }
21                 });
22 
23         System.out.println("do other things...");
24     }
25 
26 
27 }

2.要将上面whenComplete中的语句执行,进行改造:

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.*;
 4 import java.util.concurrent.atomic.AtomicBoolean;
 5 import static com.cy.java8.CompletableFutureInAction.get;
 6 
 7 public class CompletableFutureInAction2 {
 8 
 9     public static void main(String[] args) throws InterruptedException {
10         AtomicBoolean finished = new AtomicBoolean(false);
11 
12         CompletableFuture.supplyAsync(() -> get())
13                 .whenComplete((v, e) -> {
14                     System.out.println("value = " + v);
15                     if (e != null) {
16                         e.printStackTrace();
17                     }
18                     finished.set(true);
19                 });
20 
21         System.out.println("do other things...");
22 
23         while(!finished.get()){
24             Thread.sleep(1);
25         }
26     }
27 
28 
29 }

改写之后, main线程发现如果finished没有变为true就会一直等1ms,直到whenComplete执行将finished变为true。

3.上面的改写很low,其实只要将守护线程变为前台进程,main结束后不会消失就行了。

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.*;
 4 import static com.cy.java8.CompletableFutureInAction.get;
 5 
 6 public class CompletableFutureInAction2 {
 7 
 8     public static void main(String[] args){
 9         ExecutorService executorService = Executors.newFixedThreadPool(2, r -> {
10             Thread t = new Thread(r);
11             t.setDaemon(false);     //非守护线程
12             return t;
13         });
14 
15         CompletableFuture.supplyAsync(() -> get(), executorService)
16                 .whenComplete((v, e) -> {
17                     System.out.println("value = " + v);
18                     if (e != null) {
19                         e.printStackTrace();
20                     }
21                 });
22 
23         System.out.println("do other things...");
24 
25         //main执行结束之后,executorService线程不会结束,需要手动shutdown
26         executorService.shutdown();
27     }
28 
29 
30 }

三、thenApply:               

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 public class CompletableFutureInAction3 {
 8 
 9     public static void main(String[] args) {
10         ExecutorService executor = Executors.newFixedThreadPool(2, r -> {
11             Thread t = new Thread(r);
12             t.setDaemon(false);
13             return t;
14         });
15 
16         /**
17          * 将执行完的结果再*100
18          */
19         CompletableFuture.supplyAsync(CompletableFutureInAction::get, executor)
20                             .thenApply(v -> multiply(v))
21                             .whenComplete((v, e) -> System.out.println(v));
22     }
23 
24     private static double multiply(double value){
25         try {
26             Thread.sleep(1000);
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }
30         return value * 100;
31     }
32 
33 }

console打印:

43.15351824222534

四、CompletableFuture.join()  

 1 package com.cy.java8;
 2 
 3 import java.util.Arrays;
 4 import java.util.List;
 5 import java.util.concurrent.CompletableFuture;
 6 import java.util.concurrent.ExecutorService;
 7 import java.util.concurrent.Executors;
 8 import java.util.stream.Collectors;
 9 
10 public class CompletableFutureInAction3 {
11 
12     public static void main(String[] args) {
13         ExecutorService executor = Executors.newFixedThreadPool(2, r -> {
14             Thread t = new Thread(r);
15             t.setDaemon(false);
16             return t;
17         });
18 
19         /**
20          * 需求:将一组商品列表里面的每个商品对应的价格查询出来,并将这个价格*100.
21          * 5个商品同时并发去做这件事
22          *
23          * CompletableFuture.join():等到所有的结果都执行结束,会返回CompletableFuture自己本身
24          * 执行完的结果,等于get()返回的结果。
25          */
26         List<Integer> productionIDs = Arrays.asList(1, 2, 3, 4, 5);     //待查的一组商品列表的ID
27         List<Double> priceList = productionIDs.stream().map(id -> CompletableFuture.supplyAsync(() -> queryProduction(id), executor))
28                                                         .map(future -> future.thenApply(price -> multiply(price)))
29                                                         .map(multiplyFuture -> multiplyFuture.join())
30                                                         .collect(Collectors.toList());
31         System.out.println(priceList);
32 
33         /**
34          * 按照以前,要5个分别for循环去查询
35          * 或者分多个线程去查询,再将每个线程查询的结果汇总,等到全部线程都执行完了,结果也就出来了
36          */
37     }
38 
39     private static double multiply(double value) {
40         try {
41             Thread.sleep(1000);
42         } catch (InterruptedException e) {
43             e.printStackTrace();
44         }
45         return value * 100;
46     }
47 
48     /**
49      * 模拟 根据商品id查询对应的价格
50      * @param id
51      * @return
52      */
53     private static double queryProduction(int id){
54         return CompletableFutureInAction.get();
55     }
56 }

console打印:

[90.93730009374265, 23.65282935900653, 17.415066430776815, 16.605197824452343, 60.143109082288206]

 说明:这里多个任务同时执行,最终把结果汇总到一起 ,这种都是并行去执行的,编写代码也比较简洁,不需要考虑多线程之间的一些交互、锁、多线程之间的通信、控制,都不需要去关心。

五、CompletableFuture的常用API介绍 

supplyAsync
thenApply
whenComplete
handle
thenRun
thenAccept
thenCompose
thenCombine
thenAcceptBoth

runAfterBoth
applyToEither
acceptEither
runAfterEither
anyOf
allOf

1)supplyAsync、thenApply、whenComplete前面的代码已经介绍了。

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction4 {
 6     public static void main(String[] args) throws InterruptedException {
 7         CompletableFuture.supplyAsync(() -> 1)
 8                 .thenApply(v -> Integer.sum(v,10))
 9                 .whenComplete((v, e) -> System.out.println(v));
10 
11         Thread.sleep(1000);
12     }
13 }

2)whenCompleteAsync:    whenComplete是同步的方式,如果对于结果的处理是比较占时间的,不想通过这种同步的方式去做,可以用whenCompleteAsync进行异步操作。

3)handle:和thenApply差不多,只是多了一个对于异常的考虑。

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction4 {
 6     public static void main(String[] args) throws InterruptedException {
 7         CompletableFuture.supplyAsync(() -> 1)
 8                 .handle((v, e) -> Integer.sum(v, 10))
 9                 .whenComplete((v, e) -> System.out.println(v));
10 
11         Thread.sleep(1000);
12     }
13 }

4)thenRun:如果想在completableFuture整个执行结束之后,还想进行一个操作,可以thenRun(Runnable r)

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction4 {
 6     public static void main(String[] args) throws InterruptedException {
 7         CompletableFuture.supplyAsync(() -> 1)
 8                 .handle((v, e) -> Integer.sum(v, 10))
 9                 .whenComplete((v, e) -> System.out.println(v))
10                 .thenRun(()-> System.out.println("thenRunning..."));
11 
12         Thread.sleep(1000);
13     }
14 }
11
thenRunning...

5)thenAccept:   thenAccept(Consumer c)里面传的是consumer,对执行结果进行消费,不会对执行结果进行任何操作。  

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction4 {
 6     public static void main(String[] args) throws InterruptedException {
 7         CompletableFuture.supplyAsync(() -> 1)
 8                         .thenApply(v -> Integer.sum(v, 10))
 9                         .thenAccept(System.out::println);
10 
11         Thread.sleep(1000);
12     }
13 }
11

6)thenCompose: 对执行结果再交给另外一个CompletableFuture,它再去对这个执行结果进行另外的计算。compose:组合,组合设计模式。

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction4 {
 6     public static void main(String[] args) throws InterruptedException {
 7         CompletableFuture.supplyAsync(() -> 1)
 8                         .thenCompose(v -> CompletableFuture.supplyAsync(() -> v * 10))
 9                         .thenAccept(System.out::println);
10         
11         Thread.sleep(1000);
12     }
13 }
10

7)thenCombine: thenCombine(CompletableFuture extends CompletionStage, BiFuntion)

          CompletableFuture的计算结果v1作为BiFunction的第1个入参,thenCombine中的第一个参数CompletableFuture的计算结果v2作为BiFunction的第2个入参,biFunction进行操作然后返回结果。

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction4 {
 6     public static void main(String[] args) throws InterruptedException {
 7         CompletableFuture.supplyAsync(() -> 1)
 8                         .thenCombine(CompletableFuture.supplyAsync(() -> 2.0), (v1, v2) -> v1 + v2)
 9                         .thenAccept(System.out::println);
10         
11         Thread.sleep(1000);
12     }
13 }
3.0

8)thenAcceptBoth: 和thenCombine差不多,只不过它的第二个参数是BiConsumer 

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction4 {
 6     public static void main(String[] args) throws InterruptedException {
 7         CompletableFuture.supplyAsync(() -> 1)
 8                         .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2.0), (v1, v2) -> {
 9                             System.out.println("value=" + (v1 + v2));
10                         });
11 
12         Thread.sleep(1000);
13     }
14 }
value=3.0

9)runAfterBoth:两个CompletableFuture都执行结束之后,run

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction5 {
 6     public static void main(String[] args) throws InterruptedException {
 7 
 8         CompletableFuture.supplyAsync(() -> {
 9             System.out.println(Thread.currentThread().getName() + " is running");
10             return 1;
11         }).runAfterBoth(CompletableFuture.supplyAsync(() -> {
12             System.out.println(Thread.currentThread().getName() + " is running too");
13             return 2;
14         }), () -> System.out.println("both done"));
15 
16         Thread.sleep(1000);
17     }
18 }
ForkJoinPool.commonPool-worker-9 is running
ForkJoinPool.commonPool-worker-9 is running too
both done

  

10)applyToEither

applyToEither:两个CompletableFuture只要有1个执行完了,就将这个CompletableFuture交给Function。谁先执行完就将谁交给Function去执行  

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction5 {
 6     public static void main(String[] args) throws InterruptedException {
 7 
 8         CompletableFuture.supplyAsync(() -> {
 9             try {
10                 Thread.sleep(900);
11             } catch (InterruptedException e) {
12                 e.printStackTrace();
13             }
14             System.out.println("I am future 1");
15             return 1;
16         }).applyToEither(CompletableFuture.supplyAsync(() -> {
17             try {
18                 Thread.sleep(50);
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             }
22             System.out.println("I am future 2");
23             return 2;
24         }), v -> {
25             System.out.println("value = " + v);
26             return v * 10;
27         }).thenAccept(System.out::println);
28 
29 
30         Thread.currentThread().join();
31     }
32 }
I am future 2
value = 2
20
I am future 1

  

11)acceptEither 

acceptEither:acceptEither(CompletableFuture extends CompletionStage, Consumer), 两个CompletableFuture谁先执行完成,就将谁的结果交给consumer执行。

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction5 {
 6     public static void main(String[] args) throws InterruptedException {
 7 
 8         CompletableFuture.supplyAsync(() -> {
 9             try {
10                 Thread.sleep(900);
11             } catch (InterruptedException e) {
12                 e.printStackTrace();
13             }
14             System.out.println("I am future 1");
15             return 1;
16         }).acceptEither(CompletableFuture.supplyAsync(() -> {
17             try {
18                 Thread.sleep(50);
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             }
22             System.out.println("I am future 2");
23             return 2;
24         }), v -> System.out.println("value = " + v));
25 
26         Thread.currentThread().join();
27     }
28 }
I am future 2
value = 2
I am future 1

  

12)runAfterEither 

 runAfterEither: runAfterEither(CompletionStage, Runnable),只要有一个CompletableFuture执行完了,就执行run

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 
 5 public class CompletableFutureInAction5 {
 6     public static void main(String[] args) throws InterruptedException {
 7 
 8         CompletableFuture.supplyAsync(() -> {
 9             try {
10                 Thread.sleep(900);
11             } catch (InterruptedException e) {
12                 e.printStackTrace();
13             }
14             System.out.println("I am future 1");
15             return 1;
16         }).runAfterEither(CompletableFuture.supplyAsync(() -> {
17             try {
18                 Thread.sleep(50);
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             }
22             System.out.println("I am future 2");
23             return 2;
24         }), () -> System.out.println("done."));
25 
26         Thread.currentThread().join();
27     }
28 }
I am future 2
done.
I am future 1

  

13)allOf

allOf(CompletableFuture<?>... cfs),返回值是CompletableFuture<Void>。要等所有的CompletableFuture都执行完成,才能执行下一步动作。

 1 package com.cy.java8;
 2 
 3 import java.util.Arrays;
 4 import java.util.List;
 5 import java.util.Random;
 6 import java.util.concurrent.CompletableFuture;
 7 import java.util.stream.Collectors;
 8 
 9 public class CompletableFutureInAction5 {
10     private final static Random RANDOM = new Random(System.currentTimeMillis());
11 
12     public static void main(String[] args) throws InterruptedException {
13         List<CompletableFuture<Double>> list = Arrays.asList(1, 2, 3, 4).stream()
14                 .map(i -> CompletableFuture.supplyAsync(CompletableFutureInAction5::get))
15                 .collect(Collectors.toList());
16 
17         //要等所有的CompletableFuture这些task执行完了,才会打印done.
18         CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]))
19                 .thenRun(() -> System.out.println("done."));
20 
21         Thread.currentThread().join();
22     }
23 
24     static double get() {
25         try {
26             Thread.sleep(RANDOM.nextInt(3000));
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }
30         double result = RANDOM.nextDouble();
31         System.out.println(result);
32         return result;
33     }
34 }
0.6446554001163166
0.24435437709196395
0.18251850071600362
0.5261702037394511
done.

  

14)anyOf

和allOf相反,只要有一个CompletableFuture执行完成,就会执行下一步动作

 1 package com.cy.java8;
 2 
 3 import java.util.Arrays;
 4 import java.util.List;
 5 import java.util.Random;
 6 import java.util.concurrent.CompletableFuture;
 7 import java.util.stream.Collectors;
 8 
 9 public class CompletableFutureInAction5 {
10     private final static Random RANDOM = new Random(System.currentTimeMillis());
11 
12     public static void main(String[] args) throws InterruptedException {
13         List<CompletableFuture<Double>> list = Arrays.asList(1, 2, 3, 4).stream()
14                 .map(i -> CompletableFuture.supplyAsync(CompletableFutureInAction5::get))
15                 .collect(Collectors.toList());
16 
17         //只要有一个CompletableFuture执行完了,就会打印done.
18         CompletableFuture.anyOf(list.toArray(new CompletableFuture[list.size()]))
19                 .thenRun(() -> System.out.println("done."));
20 
21         Thread.currentThread().join();
22     }
23 
24     static double get() {
25         try {
26             Thread.sleep(RANDOM.nextInt(3000));
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         }
30         double result = RANDOM.nextDouble();
31         System.out.println(result);
32         return result;
33     }
34 }
0.1334361442807943
done.
0.6715112881360222
0.12945359790698785
0.1307762755130788

  

----

原文地址:https://www.cnblogs.com/tenWood/p/11614336.html