异步回调CompletableFuture

异步回调CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

CompletableFuture作用类似JavaScript的Promise。

主要功能

静态方法:

  • CompletableFuture.supplyAsync(supplier):用指定的supplier创建一个有返回值的异步回调,异步回调的结果是supplier的计算结果。默认使用ForkJoinPool.commonPool() 线程池。

  • CompletableFuture.supplyAsync(supplier,executor):同上,不过使用指定线程池。

  • CompletableFuture.runAsync(runnable):创建一个没有返回值,即泛型为Void返回值为null的异步回调,其回调函数的入参为null。默认使用ForkJoinPool.commonPool()线程池。

  • CompletableFuture.runAsync(runnable,executor):同上,不过使用指定的线程池。

  • CompletableFuture.completedFuture(U value):用给定的值创建一个已经计算完成的异步回调,值为value。

  • CompletableFuture.allOf(completableFutures):返回一个新的异步回调,会在所有异步回调计算完成时完成。新completableFuture的值始终为null。如果有一个任务抛异常了,则新回调也会抛异常。如果completableFutures为空数组,则回调直接完成,值为null。

  • CompletableFuture.allOf(completableFutures):返回一个新的异步回调,会在任何一个异步回调计算完成时完成。新completableFuture的值为完成的那个异步回调的值。如果完成的那个异步回调抛异常了,则新的异步回调也会抛异常。如果completableFutures为空数组,则返回一个不会完成的completableFuture,该completableFuture的get()方法将一直阻塞。

实例方法:

  • complete(T value):在任务未结束时,使用value值作为任务执行的结果,直接结束任务,成功返回true。在任务执行结束之后执行此方法会返回false且不会改变结果。

  • completeExceptionally(Throwable ex):在任务未结束时,用给定的异常使任务异常结束,成功返回true,get()直接抛异常。在任务执行结束之后执行此方法会返回false且不会改变结果。

  • exceptionally(Function<Throwable,T> fn):任务异常结束时对抛出的异常进行格式化,产生的新的CompletableFuture实例与当前实例泛型相同。如果当前实例返回String,则fn的返回值也是String,也即新实例的任务执行结果为String。此fn只在任务异常时执行,任务正常结束时会用当前任务的执行结果直接完成新实例。

每个实例方法都对应三种实现:xxx(callback)xxxAsync(callback)xxxAsync(callback,executor)。以下代码测试了三种方法的区别。这种区别不适用于静态方法。

/**
 * 测试 xxx、xxxAsync 和 xxxAsync(callback,executor) 的区别。
 * 每次调用方法都会打印执行的线程名。
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        ExecutorService executorService  = MyThreadPoolFactory.getThreadPool();
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->"supplyAsync:"+Thread.currentThread().getName() );
        completableFuture.
                thenApply(t-> "thenApply:"+Thread.currentThread().getName() +"    "+ t).
                thenAccept(t-> System.out.println("thenAccept:"+Thread.currentThread().getName() +"    "+ t));
        completableFuture.
                thenApplyAsync(t-> "thenApplyAsync:"+Thread.currentThread().getName() +"    "+ t).
                thenAcceptAsync(t-> System.out.println("thenAcceptAsync:"+Thread.currentThread().getName() +"    "+ t));
        completableFuture.
                thenApplyAsync(t-> "thenApplyAsync+pool:"+Thread.currentThread().getName() +"    "+ t,executorService).
                thenAcceptAsync(t-> System.out.println("thenAcceptAsync+pool:"+Thread.currentThread().getName() +"    "+ t),executorService);
    }
}
//输出:
//thenAccept:main    thenApply:main    supplyAsync:ForkJoinPool.commonPool-worker-9
//thenAcceptAsync:ForkJoinPool.commonPool-worker-9    thenApplyAsync:ForkJoinPool.commonPool-worker-9    supplyAsync:ForkJoinPool.commonPool-worker-9
//thenAcceptAsync+pool:P0T1    thenApplyAsync+pool:P0T0    supplyAsync:ForkJoinPool.commonPool-worker-9

由上面的代码可以看出:

  • xxx(callback):用调用方法的线程执行回调函数callback。
  • xxxAsync(callback):用CompletableFuture内部的线程池调用回调函数callback。
  • xxxAsync(callback,executor):用给定的线程池executor调用回调函数callback。

上面的特点适用于以下方法:除了handle和whenComplete都是在任务正常处理完成没有抛异常时执行。

  • handle(BiFunction<T,Throwable,U> fn):对任务的执行结果进行处理,T为执行结果,Throwable是抛出的异常,U为返回值。
  • whenComplete(BiConsumer<T,Throwable> action):当异步回调任务执行完成时,调用BiConsumer函数,并返回新的CompletableFuture。action消费两个参数,第一个参数是任务的执行结果,第二个参数是任务抛出的异常。
  • thenApply(Function<T,U> fn):当任务处理完成后,对结果进行处理,并返回新的CompletableFuture。fn接收一个参数T,返回另一个参数U,两个参数的类型需要通过泛型指定。
  • thenAccept(Consumer<T> action):当任务处理完成后,对结果进行处理,并返回新的CompletableFuture
  • thenAcceptBoth(CompletionStage<U> other,BiConsumer<T,U> action): 同时处理两个任务的执行结果,并返回新的CompletableFuture
  • thenCombine(CompletionStage<U> other,BiFunction<T,U, V> fn):相当于两个completableFuture版的thenApply(Function<T,U> fn),会返回新的CompletableFuture
  • thenCompose(Function<T,CompletionStage<U>> fn):用当前任务的执行结果,构成一个新的completableFuture。相当于thenApply(Function<T,CompletionStage<U>> fn)
  • thenRun(Runnable action):当前任务执行完成之后执行action。
  • acceptEither(CompletionStage<T> other, Consumer<T> action):消费掉任何一个先完成的任务的结果。
  • applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn):处理任何一个先完成的任务的结果。
  • runAfterBoth(CompletionStage<?> other,Runnable action):在两个任务都完成之后执行action
  • runAfterEither(CompletionStage<?> other,Runnable action):在任何一个完成之后执行action。

总结:

  • thenxxx和acceptEither、applyToEither、runAfterBoth、runAfterEither都只在相关任务正常结束而非异常结束时执行回调。
  • 以上提到的这些方法除了complete和completeExceptional,都是返回新的CompletableFuture实例,因此都可以链式调用。
原文地址:https://www.cnblogs.com/macho8080/p/14869375.html