completable 用法

CompletableFuture

  前面我们使用过jdk5 提出future的用法,但是在获取结果上并不是那么友好

  在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

  没有用过的同学,我们先来一个入门使用了解一下

public class CompletableFutureAction {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args) {

        //很少有这么使用的.
        CompletableFuture<Double> completableFuture = new CompletableFuture<>();

        new Thread(()->{
            double v = get();
            completableFuture.complete(v);
        }).start();


        System.out.println("程序执行");
        //当程序完成时自动回调,不需要阻塞
        completableFuture.whenComplete((v,t)->{
            Optional.ofNullable(v).ifPresent(System.out::println);
            Optional.ofNullable(t).ifPresent(x->{t.printStackTrace();});
        });
        System.out.println("程序执行后...");





    }

     static double get(){
        try {
            System.out.println("执行耗时任务");
            Thread.sleep(RANDOM.nextInt(10000));

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RANDOM.nextDouble();
    }


}
View Code

CompletableFuture.supplyAsync

我们一般不使用new的方式创建completableFuture

在main 方法中我们在使用一个例子来介绍它的使用

因为在mian方法中,主线程可能会提前结束所以我们在做这个例子的时候,需要对线程做一些阻塞

public class CompletableFutureAction1 {
    public static void main(String[] args) throws InterruptedException {
    //另一种方式来等到结果输出在结束
        ExecutorService executorService = Executors.newFixedThreadPool(2,r -> {
            Thread t = new Thread(r);
            //设置成守护线程->结果不一定能等到执行结束
            t.setDaemon(false);
            return t;
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        CompletableFuture.supplyAsync(CompletableFutureAction::get,executorService).whenComplete((v,t)->
        {
            Optional.of(v).ifPresent(System.out::println);
            atomicBoolean.set(false);
            executorService.shutdown();
            Optional.ofNullable(t).ifPresent(x->{t.printStackTrace();});
        });

        System.out.println("没有进入阻塞");
//        Thread.currentThread().join();
       /* while (atomicBoolean.get()){

        }*/


        executorService.execute(()-> System.out.println("2222"));

    }
}


我们可以使用这样的方式来判断获取到结果
  AtomicBoolean atomicBoolean = new AtomicBoolean(true);
CompletableFuture.supplyAsync(CompletableFutureAction::get).whenComplete((v,t)->
{
Optional.of(v).ifPresent(System.out::println);
atomicBoolean.set(false);
executorService.shutdown();
Optional.ofNullable(t).ifPresent(x->{t.printStackTrace();});
});

System.out.println("没有进入阻塞");
// Thread.currentThread().join();
while (atomicBoolean.get()){

}
 

comletableFuture流水线的工作

 CompletableFuture.supplyAsync(CompletableFutureAction::get,executorService)
                .thenApply(CompletableFutureAction2::multiply).whenComplete((v,t)->{
            Optional.of(v).ifPresent(System.out::println);
//            executorService.shutdown();
            Optional.ofNullable(t).ifPresent(x->{t.printStackTrace();}  );

        });
  public static double multiply(double value){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return  value*10d;
    }
   static double get(){
        try {
            System.out.println("=======执行耗时任务");
            Thread.sleep(RANDOM.nextInt(10000));

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
         double v = RANDOM.nextDouble();
         System.out.println(v);
         return v;
    }


执行的结果:

=======执行耗时任务
没有进入阻塞
0.808110430680034
8.08110430680034

 

多任务调度方式模拟:

        List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);

        Stream<CompletableFuture<Double>> completableFutureStream = ids.parallelStream().map(i -> CompletableFuture.supplyAsync(() -> queryByid(i), executorService));
  //这边也是并行执行       
List
<Double> collect = completableFutureStream.map(future -> future.thenApply(CompletableFutureAction2::multiply)).map(CompletableFuture::join).collect(Collectors.toList()); System.out.println(collect);
public static double queryByid(double i){
return CompletableFutureAction.get();
}
 

comletableFuture API

原文地址:https://www.cnblogs.com/bj-xiaodao/p/10781340.html