Dubbo的异步化调用

Dubbo2.7新特性主要三点(dubbo版本2.7.3):

  • 异步化改造
  • 三大中心改造
  • 服务治理增强

异步化改造

Dubbo的四种调用方式:

  • oneway
    一次调用,不需要返回,客户端线程请求发出即结束,立刻释放线程资源。
  • sync
    同步调用,客户端线程发送请求后,会阻塞,等到服务端返回后,才会重新唤醒线程,并继续执行后续代码步骤。
  • future
    异步化调用,客户端线程发送请求后,会继续执行后续代码,而不会等待服务端返回,当需要使用刚才请求的数据时,会调用future.get()主动去获取返回值,能很好的减少请求后数据返回的这段时间的性能开销。
  • callback
    异步化调用,这个和future的区别在于,future的返回结果是在当前线程中进行获取的,而callback是会新建一个线程,用来处理返回结果。

源码入口

通过查看dubbo的文档,找到下面这段话

在 Dubbo 的核心领域模型中:

Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。
可以看到,真正进行调用的是基于Invoker来实现的,在源码里,可以看到dubbo-rpc模块下有一个Invoker接口,提供了一个invoke方法
Result invoke(Invocation invocation) throws RpcException;

再看其类图

看到一个抽象类AbstractInvoker,实现了invoke方法

@Override
public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    if (destroyed.get()) {
        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (CollectionUtils.isNotEmptyMap(attachment)) {
        invocation.addObjectAttachmentsIfAbsent(attachment);
    }

    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
        /**
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
             */
        invocation.addObjectAttachments(contextAttachments);
    }

    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    AsyncRpcResult asyncResult;
    try {
        asyncResult = (AsyncRpcResult) doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        Throwable te = e.getTargetException();
        if (te == null) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
    }
    RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
    return asyncResult;
}

这里重点的方法asyncResult = (AsyncRpcResult) doInvoke(invocation);
调用了AbstractInvoker的抽象方法,doInvoke。由此可以看出是供子类进行扩展的。

DubboInvoker.doInvoke()

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        //我们要分析dubbo的四种调用方式,就从这个方法可以看出一些isOneway
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = calculateTimeout(invocation, methodName);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            CompletableFuture<AppResponse> appResponseFuture =
                currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

显然,RpcUtils.isOneway(getUrl(), invocation);该方法是去从已经加载的配置中,获取当前调用的服务的调用方式,判断是否是oneway的方式。
如果isOneway为true,则直接发起调用。
有个需要注意的地方,这里虽然是一次调用,但是返回的仍然是一个AsyncRpcResult对象,只是在AsyncRpcResult.newDefaultAsyncResult(invocation);这个方法里,默认将result赋值为了null,并调用future.complete()了,直接返回,并没有等待服务端的数据返回。
然后如果isOneway为false,则会进入else方法,从代码里看,好像都是进行异步调用。那么问题来了,同步调用去哪了呢?

这时候回去看上面的类图,发现有一个类AsyncToSyncInvoker,很明显,看名字就知道是异步转同步的类。

AsyncToSyncInvoker.invoker()

这里首先有个前提条件,dubbo在调用的时候,会组装一个调用链。而其实上面的DubboInvoker会被AsyncToSyncInvoker进行包装,AsyncToSyncInvoker有个内部变量private Invoker invoker;

@Override
public Result invoke(Invocation invocation) throws RpcException {
    // 调用dubboInvoker的invoke方法。
    Result asyncResult = invoker.invoke(invocation);

    try {
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            /**
                 * NOTICE!
                 * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
                 * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
                 */
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +
                               invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                                   invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                                   invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else {
            throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
                                   invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}

通过dubboInvoker类的代码分析,我们知道当调用方式不是oneway的时候,都是会进行异步调用,这时候,结果会返回给AsyncToSyncInvoker,然后可以看到

if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
    /**
     * NOTICE!
     * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
     * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
     */
    asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}

AsyncToSyncInvoker中会进行同步判断,如果需要同步,会主动调用asyncResult.get()方法,实际上调用的就是上面返回的result中的future.get()方法。

总结

从上述代码的分析来看,dubbo的调用可以分为三个步骤,

  • 判断是否是oneway的方式,如果是,就直接发送请求并返回。
  • 如果不是oneway的方式,则直接采用异步调用
  • 在AsyncToSyncInvoker中判断是否需要同步,如果需要,则立即采用future.get()方法进行阻塞获取值。

参考

dubbo的异步化调用分析

原文地址:https://www.cnblogs.com/snail-gao/p/14157891.html