CompletableFuture常用方法详解

了解CompletableFuture

在这里插入图片描述
Doug Lea,并发包的作者…就不多说了,敬佩
CompletableFuture实现了两个接口,一个是Future.一个是CompletionStage;future算是一种模式,对结果异步结果的封装,相当于异步结果,而CompletionStage相当于完成阶段,多个CompletionStage可以以流水线的方式组合起来,共同完成任务.

初始化CompletableFuture

CompletableFuture有五个静态方法,
在这里插入图片描述
这个返回已经计算好结果的,封装为CompletableFuture返回(我在想,既然已经计算好了,那我还要使用CompletableFuture干啥?我使用CompletableFuture就是为了异步处理结果数据,异步分为获取结果异步,和异步处理结果,这两个地方最耗时,我感觉这个用的不是很多):

  • public static CompletableFuture completedFuture(U value)

而其他四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

  • public static CompletableFuture runAsync(Runnable runnable)
  • public static CompletableFuture runAsync(Runnable runnable, Executor executor)
  • public static CompletableFuture supplyAsync(Supplier supplier)
  • public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

根据返回结果和请求参数我们就知道,一个是又返回结果的,一个是void的,一个是传线程池的,一个是没有传线程池的,这个具体传不传还是需要看业务,一般如果执行的比较多,就创个线程池去处理,没有多少访问的使用默认的线程池也没事,只是出了问题不好定位…具体看情况.

常用方法

获取返回结果:

在这里插入图片描述
get()方法肯定都用过,阻塞获取返回结果,下面就是加上超时时间,getNow(T)就是立马获取结果,如果还没有算出来结果,那就直接返回我传进去的默认值,用的也不多
代码测试:

 @Test
  public void test1() throws Exception {
    CompletableFuture<Integer> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
          System.out.println("正在疯狂计算...");
          try {
            TimeUnit.SECONDS.sleep(3);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          return 1 + 1;
        }
    );
//    System.out.println("阻塞获取结果...");
//    Integer integer = objectCompletableFuture.get();
//    System.out.println(integer);

    System.out.println("阻塞获取结果...");
    Integer integer = objectCompletableFuture.getNow(222);
    System.out.println(integer);

  }

这是get();会阻塞
在这里插入图片描述
getNow()不会阻塞.
在这里插入图片描述

异步执行结果(相当于设置处理器,执行完成,自动执行处理器,异步):

方法如下:

  • public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
  • public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  • public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
  • public CompletableFuture exceptionally(Function<Throwable,? extends T> fn)
    这些方法带着Async就是表示异步执行,表示可能会有其他线程来执行,如果不带Async,表示由刚刚返回结果的线程执行,如果传了线程池,则会有线程池里面其他线程来执行,也有可能是同一条线程,(建议反复阅读)
    参数的话,第一个是结果了,第二个是异常,直接使用lambda表达式表示即可,返回参数都是CompletableFuture,跟原来的CompletableFuture不是同一个对象,但是结果跟异常是一样的我用代码写出来了,exceptionally方法用的也不多,应该是如果出现了异常,重新进行计算或者直接返回默认值把.
    代码:
 public static void main(String[] args) {

    CountDownLatch countDownLatch = new CountDownLatch(1);
    CompletableFuture<Integer> integerCompletableFuture = new CompletableFuture<>();

    CompletableFuture<Integer> integerCompletableFuture1 = CompletableFuture.completedFuture(3);
    try {
      Integer integer = integerCompletableFuture1.get();
      System.out.println(1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
    
    CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
      try {
        System.out.println("进行一连串操作....");
        TimeUnit.SECONDS.sleep(3);
//        int i = 3 / 0;
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      throw new NullPointerException("aa");
//      return 3;
    });
    System.out.println(integerCompletableFuture == uCompletableFuture);
    System.out.println("设置whenComplete方法....");
    CompletableFuture<Integer> integerCompletableFuture2 = uCompletableFuture.whenComplete((x, y) -> {
      System.out.println("执行完成!" + x);
      System.out.println("执行完成!" + y);
      throw new NullPointerException("222");
    });
    integerCompletableFuture2.whenComplete((x, y) -> {
      System.out.println("执行完成!" + x);
      System.out.println("执行完成!" + y);
      countDownLatch.countDown();
    });
    System.out.println("返回的integerCompletableFuture2是否相同" + (integerCompletableFuture2 == uCompletableFuture));
    try {
      countDownLatch.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

执行结果:两个异常是相同的,但是类不是同一个类…
在这里插入图片描述

串行执行,依赖上个任务的结果进行操作

对应的方法:

   public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

使用测试:

 @Test
  public void test1() throws Exception {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
      System.out.println(Thread.currentThread().getName()+"进行一连串操作1....");
      try {
        TimeUnit.SECONDS.sleep(3);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return 1;
    }).thenApplyAsync(x -> {
      try {
        System.out.println(Thread.currentThread().getName()+"进行一连串操作2....");
        TimeUnit.SECONDS.sleep(3);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return x + 1;
    });
    CompletableFuture<Integer> future = future1.thenApplyAsync(x -> {
      try {
        System.out.println(Thread.currentThread().getName()+"进行一连串操作3....");
        TimeUnit.SECONDS.sleep(3);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return x + 1;
    });

    Integer integer = future.get();
    System.out.println(integer);

  }

串行执行,进行最后计算操作,没有返回值

源码:

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

    public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

这几个方法相当于就是消耗,没有返回值:
测试:

@Test
  public void test2() throws Exception {
    CountDownLatch countDownLatch = new CountDownLatch(1);

    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
      System.out.println(Thread.currentThread().getName()+"进行一连串操作1....");
      try {
        TimeUnit.SECONDS.sleep(3);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return 1;
    });
    future1.thenRun(() -> {
      System.out.println(Thread.currentThread().getName()+"进行一连串操作2....");
      countDownLatch.countDown();
    });
    countDownLatch.await();
    System.out.println(1);

  }

这就是几个简单常用的CompletableFuture方法,已经使用的就这些,什么都执行完在执行啊,allOf,有一个执行完执行anyOf,这些暂时没碰到,但是具体使用都比较简单,因为没有具体使用,暂时就不进行记录了…

世界上所有的不公平都是由于当事人能力不足造成的.
原文地址:https://www.cnblogs.com/javayida/p/13346762.html