Flink 源码(七):阅读 Flink 源码前必会的知识(二)Java 8 异步编程 CompletableFuture 全解析(二)

来源:https://mp.weixin.qq.com/s/7fEtXDQMWoay8zFN3x6bXw

5、CompletableFuture 接口精讲

5.1、提交执行的静态方法

方法名描述
runAsync(Runnable runnable) 执行异步代码,使用 ForkJoinPool.commonPool() 作为它的线程池
runAsync(Runnable runnable, Executor executor) 执行异步代码,使用指定的线程池
supplyAsync(Supplier<U> supplier) 异步执行代码,有返回值,使用 ForkJoinPool.commonPool() 作为它的线程池
supplyAsync(Supplier<U> supplier, Executor executor) 异步执行代码,有返回值,使用指定的线程池执行

上述四个方法,都是提交任务的,runAsync 方法需要传入一个实现了 Runnable 接口的方法,supplyAsync 需要传入一个实现了 Supplier 接口的方法,实现 get 方法,返回一个值。

(1)run 和 supply 的区别

run 就是执行一个方法,没有返回值,supply 执行一个方法,有返回值。

(2)一个参数和两个参数的区别

第二个参数是线程池,如果没有传,则使用自带的 ForkJoinPool.commonPool() 作为线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)

5.2、串行关系 api

这些 api 之间主要是能否获得前一个任务的返回值与自己是否有返回值的区别。

api是否可获得前一个任务的返回值是否有返回值
thenApply
thenAccept
thenRun 不能
thenCompose
(1) thenApply 和 thenApplyAsync 使用

thenApply 和 thenApplyAsync 把两个并行的任务串行化,另一个任务在获得上一个任务的返回值之后,做一些加工和转换。它也是有返回值的。

public class BasicFuture4 {

    @Data
    @AllArgsConstructor
    @ToString
    static class Student {
        private String name;
    }
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Student> future = CompletableFuture.supplyAsync(() -> "Jack")
                .thenApply(s -> s + " Smith")
                .thenApply(String::toUpperCase)
                .thenApplyAsync(Student::new);
        System.out.println(future.get());
    }

}

结果可以看到,输入是一个字符串,拼接了一个字符串,转换成大写,new 了一个 Student 对象返回。

BasicFuture4.Student(name=JACK SMITH)

和 thenApply 一起的还有 thenAccept 和 thenRun,thenAccept 能获得到前一个任务的返回值,但是自身没有返回值;thenRun 不能获得前一个任务的返回值,自身也没有返回值。

(2)thenApply 和 thenApplyAsync 的区别

这两个方法的区别,在于谁去执行任务。如果使用 thenApplyAsync,那么执行的线程是从 ForkJoinPool.commonPool() 或者自己定义的线程池中取线程去执行。如果使用 thenApply,又分两种情况,如果 supplyAsync 方法执行速度特别快,那么 thenApply 任务就使用主线程执行,如果 supplyAsync 执行速度特别慢,就是和 supplyAsync 执行线程一样。

可以使用下面的例子演示一下:

package com.dsj361.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @Author wangkai
 */
public class BasicFuture8 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("----------supplyAsync 执行很快");
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return "1";
        }).thenApply(s -> {
            System.out.println(Thread.currentThread().getName());
            return "2";
        });
        System.out.println(future1.get());

        System.out.println("----------supplyAsync 执行很慢");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread().getName());
            return "1";
        }).thenApply(s -> {
            System.out.println(Thread.currentThread().getName());
            return "2";
        });
        System.out.println(future2.get());
    }
}

执行结果:

----------supplyAsync 执行很快
ForkJoinPool.commonPool-worker-1
main
2
----------supplyAsync 执行很慢
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
2
(3)thenCompose 的使用

假设有两个异步任务,第二个任务想要获取第一个任务的返回值,并且做运算,我们可以用 thenCompose。此时使用 thenApply 也可以实现,看一段代码发现他们的区别:

public class BasicFuture9 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = getLastOne().thenCompose(BasicFuture9::getLastTwo);
        System.out.println(future.get());

        CompletableFuture<CompletableFuture<String>> future2 = getLastOne().thenApply(s -> getLastTwo(s));
        System.out.println(future2.get().get());
    }

    public static CompletableFuture<String> getLastOne(){
        return CompletableFuture.supplyAsync(()-> "topOne");
    }

    public static CompletableFuture<String> getLastTwo(String s){
        return CompletableFuture.supplyAsync(()-> s + "  topTwo");
    }
}

可以看到使用 thenApply 的时候,需要使用两个 get() 方法才能获取到最终的返回值,使用 thenCompose 只要一个即可。

5.3、And 汇聚关系 Api

(1)thenCombine 的使用

加入我们要计算两个异步方法返回值的和,就必须要等到两个异步任务都计算完才能求和,此时可以用 thenCombine 来完成。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
    CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
    CompletableFuture<Integer> thenComposeCount = thenComposeOne
        .thenCombine(thenComposeTwo, (s, y) -> s + y);

    thenComposeOne.thenAcceptBoth(thenComposeTwo,(s,y)-> System.out.println("thenAcceptBoth"));
    thenComposeOne.runAfterBoth(thenComposeTwo, () -> System.out.println("runAfterBoth"));

    System.out.println(thenComposeCount.get());
}

可以看到 thenCombine 第二个参数是一个 Function 函数,前面两个异步任务都完成之后,使用这个函数来完成一些运算。

(2)thenAcceptBoth

接收前面两个异步任务的结果,执行一个回调函数,但是这个回调函数没有返回值。

(3)runAfterBoth

接收前面两个异步任务的结果,但是回调函数,不接收参数,也不返回值。

5.4、Or 汇聚关系 Api

public class BasicFuture11 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
        CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
        CompletableFuture<Integer> thenComposeCount = thenComposeOne
                .applyToEither(thenComposeTwo, s -> s + 1);

        thenComposeOne.acceptEither(thenComposeTwo,s -> {});
        
        thenComposeOne.runAfterEither(thenComposeTwo,()->{});

        System.out.println(thenComposeCount.get());
    }
}
(1)applyToEither

任何一个执行完就执行回调方法,回调方法接收一个参数,有返回值

(2)acceptEither

任何一个执行完就执行回调方法,回调方法接收一个参数,无返回值

(3)runAfterEither

任何一个执行完就执行回调方法,回调方法不接收参数,也无返回值

5.5、处理异常

上面我们讲了如何把几个异步任务编排起来,执行一些串行或者汇聚操作。还有一个重要的地方,就是异常的处理。

先看下面的例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(() -> {
        System.out.println("execute one ");
        return 100;
    })
        .thenApply(s -> 10 / 0)
        .thenRun(() -> System.out.println("thenRun"))
        .thenAccept(s -> System.out.println("thenAccept"));

    CompletableFuture.runAsync(() -> System.out.println("other"));
}

结果:

execute one 
other

可以发现,只要链条上有一个任务发生了异常,这个链条下面的任务都不再执行了。

但是 main 方法上的接下来的代码还是会执行的。

所以这个时候,需要合理的去处理异常来完成一些收尾的工作。

public class BasicFuture12 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("execute one ");
            return 100;
        })
                .thenApply(s -> 10 / 0)
                .thenRun(() -> System.out.println("thenRun"))
                .thenAccept(s -> System.out.println("thenAccept"))
                .exceptionally(s -> {
                    System.out.println("异常处理");
                    return null;
                });

        CompletableFuture.runAsync(() -> System.out.println("other"));
    }
}

可以使用  exceptionally 来处理异常。

使用 handle() 方法也可以处理异常。但是 handle() 方法的不同之处在于,即使没有发生异常,也会执行。

 

 

原文地址:https://www.cnblogs.com/qiu-hua/p/14489203.html