CompletableFuture异步编排

启动异步任务

CompletableFuture提供了四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) 

1.runXxx都是没有返回结果的,supplyXxx都是可以获取返回结果的

2.可以传入自定义线程池,否则使用默认线程池

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //CompletableFuture.runAsync(new Runable01(),service);
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1, service);
        Integer integer = future.get();
        System.out.println(integer);
        service.shutdown();
    }

计算完成时回调方法

    public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor) {
        return uniWhenCompleteStage(screenExecutor(executor), action);
    }

    public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }

whenComplete可以处理和异常的计算结果,exceptionally处理异常情况

whenComplete和whenCompleteAsync区别:

​ - whenComplete:是执行当前任务的线程执行完毕后继续执行whenComplete的任务

​ - whenCompleteAsync:提交给线程池来执行

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future =
                CompletableFuture.supplyAsync(() ->{
                    int i=1/0;
                    return 1;
                } , service)
                        //可以得到异常信息,但是没办法修改返回数据
                    .whenComplete((res,e)->System.out.println(res+":"+e))
                        //可以感知异常,同时可以修改返回数据
                    .exceptionally(e-> 10);
        System.out.println(future.get());
        service.shutdown();
    }

handle方法

handle方法可以对结果进行处理(也可以处理异常),也可以改变返回值

    public <U> CompletionStage<U> handle
        (BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync
        (BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync
        (BiFunction<? super T, Throwable, ? extends U> fn,
         Executor executor);
        CompletableFuture<Integer> future =
                CompletableFuture.supplyAsync(() ->{
                    int i=1/0;
                    return 1;
                } , service).handleAsync((r,e)->e==null?100:200,service);
        System.out.println(future.get());

线程串行化方法

    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn,
         Executor executor);
    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn);
    public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn,
         Executor executor);

    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

    public CompletionStage<Void> thenRun(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenApply:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值

thenAccept:消费处理结果,接受任务处理结果,并消费处理,无返回结果。

thenRun:只要上面的任务执行完成,就开始执行thenRun。不获取上一个任务的结果,业务返回结果。

带有Async都是异步执行的。以上都要前置任务成功完成。

//thenRunAsync测试
        CompletableFuture future =
                CompletableFuture.supplyAsync(() ->1, service)
                    .thenRunAsync(()-> System.out.println("执行完成"),service);
//thenApplyAsync测试
        CompletableFuture future =
                CompletableFuture.supplyAsync(() ->1, service)
                    .thenApplyAsync((i)-> {
                        System.out.println("执行完成");
                        return i;
                    },service);
        System.out.println(future.get());
//thenAcceptAsync测试
        CompletableFuture future =
                CompletableFuture.supplyAsync(() ->1, service)
                    .thenAcceptAsync((i)-> {
                        System.out.println("执行完成"+":"+i);
                    },service);
        System.out.println(future.get());

俩任务组合(都要完成)

    public <U,V> CompletionStage<V> thenCombine
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn,
         Executor executor);
    public <U> CompletionStage<Void> thenAcceptBoth
        (CompletionStage<? extends U> other,
         BiConsumer<? super T, ? super U> action);
    public <U> CompletionStage<Void> thenAcceptBothAsync
        (CompletionStage<? extends U> other,
         BiConsumer<? super T, ? super U> action);
    public <U> CompletionStage<Void> thenAcceptBothAsync
        (CompletionStage<? extends U> other,
         BiConsumer<? super T, ? super U> action,
         Executor executor);
    public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

两个任务必须都完成,才触发该任务。

thenCombine:组合两个future,获取两个future的返回值,并返回当前任务返回值。

thenAcceptBoth:组合两个future,获取两个future的返回值,处理任务,没有返回值。

runAfterBoth:组合两个future,不获取future结果,等待两个future完成后,执行当前任务,无返回值。

//测试thenCombine
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(() ->1, service);
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(() ->2, service);
        CompletableFuture<Integer> future = future1.thenCombineAsync(future2, (t, u) -> t  + u, service);
        System.out.println(future.get());
//测试thenAcceptBoth
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(() ->1, service);
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(() ->2, service);
        CompletableFuture<Void> future = future1.thenAcceptBothAsync(future2, (t, u) -> System.out.println(t+u), service);
        System.out.println(future.get());
//测试runAfterBoth
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(() ->1, service);
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(() ->2, service);
        CompletableFuture<Void> future = future1.runAfterBothAsync(future2, ()-> System.out.println("f1和f2执行完毕"), service);
        System.out.println(future.get());

俩任务组合(一个完成)

    public <U> CompletionStage<U> applyToEither
        (CompletionStage<? extends T> other,
         Function<? super T, U> fn);
    public <U> CompletionStage<U> applyToEitherAsync
        (CompletionStage<? extends T> other,
         Function<? super T, U> fn);
    public <U> CompletionStage<U> applyToEitherAsync
        (CompletionStage<? extends T> other,
         Function<? super T, U> fn,
         Executor executor);
    public CompletionStage<Void> acceptEither
        (CompletionStage<? extends T> other,
         Consumer<? super T> action);
    public CompletionStage<Void> acceptEitherAsync
        (CompletionStage<? extends T> other,
         Consumer<? super T> action);
    public CompletionStage<Void> acceptEitherAsync
        (CompletionStage<? extends T> other,
         Consumer<? super T> action,
         Executor executor);
    public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
                                                Runnable action);
    public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,
         Runnable action);
    public CompletionStage<Void> runAfterEitherAsync
        (CompletionStage<?> other,
         Runnable action,
         Executor executor);

与前面讲的两个任务都要完成用法差不多,这里只需要两个中的一个任务完成即可。

//测试applyToEitherAsync
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(() ->1, service);
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(() ->2, service);
        CompletableFuture<Integer> future = future1.applyToEitherAsync(future2, Function.identity(), service);
        System.out.println(future.get());

多任务组合

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }

allOf:等待所有任务完成

anyOf:只要有一个任务完成

原文地址:https://www.cnblogs.com/wwjj4811/p/13945693.html