Flink 源码(八):阅读 Flink 源码前必会的知识(三)Java 8 异步编程 CompletableFuture 全解析(三)

来源:https://mp.weixin.qq.com/s/7fEtXDQMWoay8zFN3x6bXw

6、烧水泡茶程序的实现

6.1、使用 Thread 多线程和 CountDownLatch 来实现

public class MakeTee {

    private static CountDownLatch countDownLatch = new CountDownLatch(2);

    static class HeatUpWater implements Runnable {

        private CountDownLatch countDownLatch;

        public HeatUpWater(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                System.out.println("洗水壶");
                Thread.sleep(1000);
                System.out.println("烧开水");
                Thread.sleep(5000);
                countDownLatch.countDown();
            } catch (InterruptedException e) {
            }

        }
    }

    static class PrepareTee implements Runnable {
        private CountDownLatch countDownLatch;

        public PrepareTee(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                System.out.println("洗茶壶");
                Thread.sleep(1000);
                System.out.println("洗茶杯");
                Thread.sleep(1000);
                System.out.println("拿茶叶");
                Thread.sleep(1000);
                countDownLatch.countDown();
            } catch (InterruptedException e) {
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Thread(new HeatUpWater(countDownLatch) ).start();
        new Thread(new PrepareTee(countDownLatch)).start();
        countDownLatch.await();
        System.out.println("准备就绪,开始泡茶");
    }
}

这里我们使用两个线程,分别执行烧水和泡茶的程序,使用 CountDownLatch 来协调两个线程的进度,等到他们都执行完成之后,再执行泡茶的动作。

可以看到这种方法,多了很多不必要的代码,new Thread,人工维护 CountDownLatch 的进度。

6.2、使用 CompletableFuture 来实现

public class MakeTeeFuture {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("洗水壶");
                Thread.sleep(1000);
                System.out.println("烧开水");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("洗茶壶");
                Thread.sleep(1000);
                System.out.println("洗茶杯");
                Thread.sleep(1000);
                System.out.println("拿茶叶");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        CompletableFuture<Void> finish = future1.runAfterBoth(future2, () -> {
            System.out.println("准备完毕,开始泡茶");
        });
        System.out.println(finish.get());
    }
}

这个程序极度简单,无需手工维护线程,给任务分配线程的工作也不需要关注。

同时语义也更加清晰,future1.runAfterBoth(future2,......) 能够清晰的表述“任务 3 要等到任务 1 和任务 2 都完成之后才能继续开始”

然后代码更加简练并且专注于业务逻辑,几乎所有的代码都是业务逻辑相关的。

7、总结

本文介绍了异步编程的概念,以及 Java8 的 CompletableFuture 是如何优雅的处理多个异步任务之间的协调工作的。CompletableFuture 能够极大简化我们对于异步任务编排的工作,Flink 在提交任务时,也是使用这种异步任务的方式,去编排提交时和提交后对于任务状态处理的一些工作的。

相信读了本篇文章,会对于你日后的工作以及阅读 Flink 源码由很大的帮助的!

原文地址:https://www.cnblogs.com/qiu-hua/p/14489207.html