consumer filter

ProtocolFilterWrapper中buildInvokerChain方法把Filter链在一起,调用执行的时候,逐个执行filter,最后执行filter中的invoker。

//ProtocolFilterWrapper
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (filters.size() > 0) {
        for (int i = filters.size() - 1; i >= 0; i --) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }
                public URL getUrl() {
                    return invoker.getUrl();
                }
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }
                public void destroy() {
                    invoker.destroy();
                }
                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

上面代码中,last的视图:

我们这样看:匿名内部类 new Invoker<T> { ... }中,

封装了final Filter filter,final Invoker<T> next,还有final Invoker<T> invoker,然后把filter链在一起。

consumer端一共有3个filter:ConsumerContextFilter, MonitorFilter, FutureFilter

1. ConsumerContextFilter 对RpcContext做设置和清理

@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(), 
                                  invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation)invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }

}

2. MonitorFilter 统计接口方法调用信息

3. FutureFilter  处理异步调用

@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {}

在DubboInvoker.doInvoke方法中,
如果是异步,则在RpcContext设置Future,并返回空的RpcResult:

ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();

如果是同步,则调用Future.get()方法而阻塞:

RpcContext.getContext().setFuture(null);
ResponseFuture future = currentClient.request(inv, timeout);
Result result = (Result)future.get();
return  result;

FutureAdapter实现了Future接口,内部封装了一个ResponseFuture对象。
ResponseFuture是一个接口,DefaultFuture是主要实现类。

FutureFilter.asyncCallback()为DefaultFuture设置回调函数:

//FutureFilter.asyncCallback
ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
future.setCallback(new ResponseCallback() {
    public void done(Object rpcResult) {
        ...
    }
    public void caught(Throwable exception) {
        ...
    }
});
//DefaultFuture
public void setCallback(ResponseCallback callback) {
    if (isDone()) {
        invokeCallback(callback);
    } else {
        boolean isdone = false;
        lock.lock();
        try{
            if (!isDone()) {
                this.callback = callback;
            } else {
                isdone = true;
            }
        }finally {
            lock.unlock();
        }
        if (isdone){
            invokeCallback(callback);
        }
    }
}
private void invokeCallback(ResponseCallback c){
    ResponseCallback callbackCopy = c;
    if (callbackCopy == null){
        throw new NullPointerException("callback cannot be null.");
    }
    c = null;
    Response res = response;
    if (res == null) {
        throw new IllegalStateException("response cannot be null. url:"+channel.getUrl());
    }
    
    if (res.getStatus() == Response.OK) {
        try {
            callbackCopy.done(res.getResult());
        } catch (Exception e) {
            logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);
        }
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        try {
            TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
            callbackCopy.caught(te);
        } catch (Exception e) {
            logger.error("callback invoke error ,url:" + channel.getUrl(), e);
        }
    } else {
        try {
            RuntimeException re = new RuntimeException(res.getErrorMessage());
            callbackCopy.caught(re);
        } catch (Exception e) {
            logger.error("callback invoke error ,url:" + channel.getUrl(), e);
        }
    }
}
原文地址:https://www.cnblogs.com/allenwas3/p/8352157.html