利用CompletableFuture优化程序的执行效率

一、线程池的Future模式

在了解java8的CompletableFuture之前,先通过Future来解决一个问题,看个例子:

假设现在有一个网站,首页有顶部Banner位、左边栏、右边栏、用户信息几大模块需要加载,现在出一个接口,要求包装并吐出这几大模块的内容

先来抽象一个首页接口对象:


public class WebModule {

    private String top; //顶部Banner位

    private String left; //左边栏

    private String right; //右边栏

    private String user; //用户信息

    //...get...set...

    @Override
    public String toString() {
        return String.format("top: %s;  left: %s;  right: %s;  user: %s", top, left, right, user);
    }
}

现在提供下面几个业务方法来获取这些信息:


private String getTop() { // 这里假设getTop需要执行200ms
        try {
            Thread.sleep(200L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "顶部banner位";
    }

    private String getLeft() { // 这里假设getLeft需要执行50ms
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "左边栏";
    }

    private String getRight() { // 这里假设getRight需要执行80ms
        try {
            Thread.sleep(80L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "右边栏";
    }

    private String getUser() { // 这里假设getUser需要执行100ms
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "用户信息";
    }

ok,现在来实现下这个接口:


// 同步获取
public WebModule getWebModuleMsgSync() {
   WebModule webModule = new WebModule();
   webModule.setTop(getTop());
   webModule.setLeft(getLeft());
   webModule.setRight(getRight());
   webModule.setUser(getUser());
   return webModule;
}

上面的代码会一次调用一个方法来赋值,最终返回接口对象,这个方法的最终耗时为几个业务方法耗时的总和:


通过同步方法获取首页全部信息消耗时间:435ms
结果为:top: 顶部banner位;  left: 左边栏;  right: 右边栏;  user: 用户信息

430ms左右的执行时间,其实这几个模块是相互独立没有影响的,因此可以使用线程池的Future模式来进行多线程处理优化:


// 异步获取
public WebModule getWebModuleMsgAsync() throws ExecutionException, InterruptedException {
   Future top = executorService.submit(this::getTop);
   Future left = executorService.submit(this::getLeft);
   Future right = executorService.submit(this::getRight);
   Future user = executorService.submit(this::getUser);
   WebModule webModule = new WebModule();
   webModule.setTop(top.get());
   webModule.setLeft(left.get());
   webModule.setRight(right.get());
   webModule.setUser(user.get());
   return webModule;
}

这几个方法会被异步执行,get方法会被阻塞,直到执行结束,运行结果如下:


通过异步方法获取首页全部信息消耗时间:276ms
结果为:top: 顶部banner位;  left: 左边栏;  right: 右边栏;  user: 用户信息

可以看到,执行速度几乎降了近200ms,这取决于最慢的那个任务的耗时。

通过上述的例子可以发现,很多程序都是可以通过异步充分利用CPU资源的方式来进行优化处理的,单看上面的程序没什么问题,但是仔细想想会发现太过局限,因为几个模块相互独立,但在实际开发中,我们可能存在B方法需要拿到A方法的结果才可以往下进行的问题,所以上面的程序就不太适用了,java8出现了今天要说的一个内容:CompletableFuture,该类可以帮助你实现上面所说的任务顺序调度,不相干的程序依然在异步,相干的存在先后顺序的将会通过一定的设置来满足自己的顺序期望。

二、CompletableFuture

现在再来假设一个例子,现在存在以下几个方法的调用:

zero方法、a方法、b方法、ab方法、c方法、d方法、e方法

定义如下:


    //各个方法,sleep当成是执行时间
    
    private void zero() {
        sleep(100L);
        System.out.println("zero方法触发!
-----------------------------");
    }

    private String a() {
        sleep(500L);
        return "a";
    }

    private String b(String a) {
        sleep(1000L);
        return a + "b";
    }

    private String c() {
        sleep(500L);
        return "c";
    }

    private String ab(String a, String b) {
        sleep(100L);
        return a + "|" + b;
    }

    private void d(String a) {
        sleep(1000L);
        System.out.println("d方法触发,拿到的a = " + a);
    }

    private String e(String a) {
        sleep(100L);
        return a + "e";
    }

根据上面的方法定义,可以整理出来其执行关系:

zero、a、c都是独立调用的方法,而b、d、e方法都需要拿到a的执行结果值才能触发,ab方法则要求更加苛刻,需要同时拿到a和b的执行结果才可以触发,现在假设需要把所有的方法都触发一遍,我们又期望通过异步的方式来尽可能的优化代码,这个时候如果还用上面例子里的方式,恐怕就很难进行下去了,因为很多方法存在相互依赖的现象,不过现在有了CompletableFuture,这个问题就可以解决了,来看下代码(方法及作用都写在注释上了,下面的文章就不多做说明了):


public static void main(String[] args) throws ExecutionException, InterruptedException {

        long s = System.currentTimeMillis();
        Test t = new Test();

        //runAsync用于执行没有返回值的异步任务
        CompletableFuture future0 = CompletableFuture.runAsync(t::zero)
                .exceptionally(e -> {
                    System.out.println("Zero出错!");
                    return null;
                }); //这里是异常处理,指的是该异步任务执行中出错,应该做的处理

        //supplyAsync方法用于执行带有返回值的异步任务
        CompletableFuture futureA = CompletableFuture.supplyAsync(t::a)
                .exceptionally(e -> {
                    System.out.println("方法A出错!");
                    return null;
                });

        //thenCompose方法用于连接两个CompletableFuture任务,如下代表futureA结束后将执行结果交由另外一个CompletableFuture处理,然后将执行链路最终赋值给futureB
        CompletableFuture futureB = futureA.thenCompose(a -> CompletableFuture.supplyAsync(() -> t.b(a)))
                .exceptionally(e -> {
                    System.out.println("方法B出错!");
                    return null;
                });

        //thenAccept方法用于将一个任务的结果,传给需要该结果的任务,如下表示futureD的执行需要futureA的结果,与thenApply不同的是,这个方法没有有返回值
        CompletableFuture futureD = futureA.thenAccept(t::d);

        //thenApply方法用于将一个任务的结果,传给需要该结果的任务,如下表示futureE的执行需要futureA的结果,与thenAccept不同的是,这个方法有返回值
        CompletableFuture futureE = futureA.thenApply(t::e)
                .exceptionally(e -> {
                    System.out.println("方法E出错!");
                    return null;
                });

        /**
         * thenApply方法概念容易与thenCompose混淆,毕竟最终目的很相似
         */

        //thenCombine方法用于连接多个异步任务的结果,如下ab方法需要futureA和futureB的执行结果,那么就可以使用thenCombine进行连接
        //注意,执行到ab这里,说明futureA和futureB一定已经执行完了
        CompletableFuture futureAB = futureA.thenCombine(futureB, t::ab)
                .exceptionally(e -> {
                    System.out.println("方法AB出错!");
                    return null;
                });

        //单纯的一个异步任务,不依赖任何其他任务
        CompletableFuture futureC = CompletableFuture.supplyAsync(t::c)
                .exceptionally(e -> {
                    System.out.println("方法C出错!");
                    return null;
                });

        //allOf如果阻塞结束则表示所有任务都执行结束了
        CompletableFuture.allOf(future0, futureA, futureB, futureAB, futureC, futureD, futureE).get();

        System.out.println("方法Zero输出:" + future0.get());
        System.out.println("方法A输出:" + futureA.get());
        System.out.println("方法B输出:" + futureB.get());
        System.out.println("方法AB输出:" + futureAB.get());
        System.out.println("方法C输出:" + futureC.get());
        System.out.println("方法D输出:" + futureD.get());
        System.out.println("方法E输出:" + futureE.get());
        System.out.println("耗时:" + (System.currentTimeMillis() - s) + "ms");
    }

输出结果如下:


zero方法触发!
-----------------------------
d方法触发,拿到的a = a
方法Zero输出:null
方法A输出:a
方法B输出:ab
方法AB输出:a|ab
方法C输出:c
方法D输出:null
方法E输出:ae
耗时:1668ms

可以看到,逻辑方面是没有任何问题的,也按照预期的顺序和方式进行了,注意看这里的运行时间,约等于1600ms,与第一个例子时长取决于执行时间最长的那个方法不同,上面的例子时长取决于有序的执行链的耗时最长的执行时间,分析下上面的程序,顺序链最长的,就是ab这条,ab需要a和b全部执行完,而b又依赖a的结果,因此ab执行完的时间就是500+1000的时间(a需要500ms,b又需要等待a,500ms后b触发,b自身又需要1000ms,等都结束了,再触发ab方法,而ab方法又需要100ms的执行时间,因此ab是最长的耗时方法,ab耗时=500+1000+100)

需要说明的是上述例子里用到的方法,几乎每个都有个重载方法,用来传递一个线程池对象,例子里用的都是不传的,用的是其内部的ForkJoinPool.commonPool()。

CompletableFuture的用法还有很多很多,较常用的应该就是例子里的几种,更多的用法以后会继续记录到这里。

原文地址:https://www.cnblogs.com/hama1993/p/10534202.html