dubbo源码阅读-Filter默认实现(十)

SPI配置的默认实现

cache=com.alibaba.dubbo.cache.filter.CacheFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter

Consumer

ConsumerContextFilter

记录一些基础信息到当前线程的PRCContext

@Activate(
            group = {"consumer"},
            order = -10000
    )
    public class ConsumerContextFilter implements Filter {
        public ConsumerContextFilter() {
        }

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            /***
             * 从线程缓存获取本次RpcContext.getContext()
             * 设置一些本次请求的基础信息到RpcContext
             */
            RpcContext.getContext().setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0)
                    .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation)invocation).setInvoker(invoker);
            }

            Result var3;
            try {
                /**
                 * 客户端相关参数是根据 invocation传递给消费者的 可以打断点看 也可以自定义一些数据 比如traceId
                 */
                var3 = invoker.invoke(invocation);
            } finally {
                RpcContext.getContext().clearAttachments();
            }

            return var3;
        }
    }

ActiveLimitFilter

例子

同时只支持1的并发量

<dubbo:method actives="1" ... />

源码

ActiveLimitFilter主要用于 限制同一个客户端对于一个服务端方法的并发调用量(客户端限流)。

/**
 * 控制调用服务的并发量 限流
 * 同时支持多少请求
 */
@Activate(
        group = {"consumer"},
        value = {"actives"}
)
public class ActiveLimitFilter implements Filter {
    public ActiveLimitFilter() {
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        //获得 <dubbo:reference actives="1"> actives的数量
        int max = invoker.getUrl().getMethodParameter(methodName, "actives", 0);
        //获取当前service当前方法的请求数量
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        long timeout;
        //配置并发控制大于0才写
        if (max > 0) {
            //获得当前方法的等待时间
            timeout = (long)invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            //判断是否大于并发数 如果大于则等待
            int active = count.getActive();
            if (active >= max) {
                synchronized(count) {
                    /**
                     *1.while循环是有必要的
                     *  当收到其他线程notify 获得执行权
                     *  但是这个时候其他线程提前进入(active >= max)  判断为false获得执行权 count+1 
                     *  这个时候 还需要while判断是否还有空闲请求 否则继续wait
                     *
                     */
                    while((active = count.getActive()) >= max) {
                        try {
                            //超时时间为 配置的超时时间
                            count.wait(remain);
                        } catch (InterruptedException var32) {
                            ;
                        }

                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                         //当其他线程通知等待线程执行 判断是否超时 如果超时了则不执行了
                        if (remain <= 0L) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }

        boolean var28 = false;

        Result var10;
        try {
            var28 = true;
            timeout = System.currentTimeMillis();
            //获得执行权的 count+1
            RpcStatus.beginCount(url, methodName);

            try {
                //执行
                Result result = invoker.invoke(invocation);
                //执行完毕关闭
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - timeout, true);
                var10 = result;
                var28 = false;
            } catch (RuntimeException var31) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - timeout, false);
                throw var31;
            }
        } finally {
            if (var28) {
                if (max > 0) {
                    //通知等待的线程执行
                    synchronized(count) {
                        count.notify();
                    }
                }

            }
        }

        if (max > 0) {
            synchronized(count) {
                //通知等待的线程执行
                count.notify();
            }
        }

        return var10;
    }
}

FutureFilter

用于处理事件 异步回调 同步回调  异常回调

例子

1.自定义一个event接口

/**
 * @author liqiang
 * @date 2019/11/28 10:34
 * @Description:定义一个抽象接口实现
 */
public interface IEvent {
    /**
     * 从源码可以看到 传递的这2个参数
     *
     * @param throwable
     * @param args
     */
    public void onThrow(Throwable throwable, Object[] args);

    /**
     * 从源码可以看到传递这2个参数
     * async为true表示异步回调
     * async为false 表示调用完成回调
     *
     * @param params
     */
    public void onReturn(Object[] params);

    /**
     * 参数列表要跟执行列表一样
     * 这里参数列表不固定 应该抽出去 因为是demo 就写一起
     */
    public void onInvoke(PrizeDrawReqDto prizeDrawReqDto);
}

2.实现类

/**
 * @author liqiang
 * @date 2019/11/28 10:46
 * @Description: 回调实现类
 */
public class PagePromotionServicePrizeDrawEvent implements IEvent {
    /**
     * 从源码可以看到 传递的这2个参数
     * 发生异常时回调
     * @param throwable
     * @param args
     */
    @Override
    public void onThrow(Throwable throwable, Object[] args) {
        System.out.println(String.format("异常回调,参数:%s",args.toString()));
    }
    /**
     * 从源码可以看到传递这2个参数
     * async为true表示异步回调
     * async为false 表示调用完成回调
     *
     * @param params
     */
    @Override
    public void onReturn(Object[] params) {
      System.out.println(String.format("onReturn回调:",params.toString()));
    }

    /**
     * 参数列表要跟执行列表一样
     *执行前回调
     * @param prizeDrawReqDto
     */
    @Override
    public void onInvoke(PrizeDrawReqDto prizeDrawReqDto) {
        System.out.println(String.format("onInvoke执行前回调:",prizeDrawReqDto.toString()));
    }
}

2.consumer配置

    <!--定义回调实现类的bean-->
    <bean id="pagePromotionServicePrizeDrawEvent" class="com.bozhi.notify.PagePromotionServicePrizeDrawEvent"></bean>
    <dubbo:reference id="frontendPagePromotionService"
                     url="dubbo://127.0.0.1:23888/com.biz.soa.service.pagepromotion.frontend.PagePromotionService"
                     interface="com.biz.soa.service.pagepromotion.frontend.PagePromotionService" check="false">
        <dubbo:method  async="true"  name="prizeDraw"
                      onreturn="pagePromotionServicePrizeDrawEvent.onReturn"
                      oninvoke="pagePromotionServicePrizeDrawEvent.onInvoke"
                      onthrow="pagePromotionServicePrizeDrawEvent.onThrow"></dubbo:method>
    </dubbo:reference>

源码

@Activate(
        group = {"consumer"}//消费端的过滤器
)
public class FutureFilter implements Filter {
    protected static final Logger logger = LoggerFactory.getLogger(com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.class);

    public FutureFilter() {
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //在url后面获取是否是异步
        boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
        //执行调用前时间 onInvoker
        this.fireInvokeCallback(invoker, invocation);
        //执行invoke
        Result result = invoker.invoke(invocation);
        //判断是同步还是异步
        if (isAsync) {
            //处理异步调用回调
            this.asyncCallback(invoker, invocation);
        } else {
            //处理同步调用回调
            this.syncCallback(invoker, invocation, result);
        }
        return result;
    }

    private void syncCallback(Invoker<?> invoker, Invocation invocation, Result result) {
        //是否发生异常 如果发生调用
        if (result.hasException()) {
            //调用异常回调
            this.fireThrowCallback(invoker, invocation, result.getException());
        } else {
            //触发onReturn
            this.fireReturnCallback(invoker, invocation, result.getValue());
        }

    }

    private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
        //获取异步调用的Future
        Future<?> f = RpcContext.getContext().getFuture();
        //判断是否是FutureAdapter适配器
        if (f instanceof FutureAdapter) {
            ResponseFuture future = ((FutureAdapter)f).getFuture();
            //设置异步回调
            future.setCallback(new ResponseCallback() {
                public void done(Object rpcResult) {
                    //获得返回结果 并不是provider返回值哦
                    if (rpcResult == null) {
                        com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                    } else if (!(rpcResult instanceof Result)) {
                        com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                    } else {
                        Result result = (Result)rpcResult;
                        //如果有异常触发异常回调
                        if (result.hasException()) {
                            //当前类是匿名类  com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this 为匿名类访问上层类对象的实例 就是FutureFilter实例
                            com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this.fireThrowCallback(invoker, invocation, result.getException());
                        } else {
                            //当前类是匿名类  com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this 为匿名类访问上层类对象的实例 就是FutureFilter实例
                            com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this.fireReturnCallback(invoker, invocation, result.getValue());
                        }

                    }
                }

                public void caught(Throwable exception) {
                    //异常回调
                    com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this.fireThrowCallback(invoker, invocation, exception);
                }
            });
        }

    }

    private void fireInvokeCallback(Invoker<?> invoker, Invocation invocation) {
        /**
         * 获取我们配置的oinvoke的方法
         * key为key为com.biz.soa.service.pagepromotion.frontend.PagePromotionService.prizeDraw.oninvoke.method
         */
        Method onInvokeMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "oninvoke.method"));
        /**
         * 获取我们配置的oninvoke实例
         * key为com.biz.soa.service.pagepromotion.frontend.PagePromotionService.prizeDraw.oninvoke.instance
         */
        Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "oninvoke.instance"));
        if (onInvokeMethod != null || onInvokeInst != null) {
            if (onInvokeMethod != null && onInvokeInst != null) {
                if (!onInvokeMethod.isAccessible()) {
                    onInvokeMethod.setAccessible(true);
                }

                Object[] params = invocation.getArguments();

                try {
                    //反射执行调用
                    onInvokeMethod.invoke(onInvokeInst, params);
                } catch (InvocationTargetException var7) {
                    //调用失败会触发异常回调
                    this.fireThrowCallback(invoker, invocation, var7.getTargetException());
                } catch (Throwable var8) {
                    //调用失败会触发异常回调
                    this.fireThrowCallback(invoker, invocation, var8);
                }

            } else {
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
            }
        }
    }

    private void fireReturnCallback(Invoker<?> invoker, Invocation invocation, Object result) {
        Method onReturnMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "onreturn.method"));
        Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "onreturn.instance"));
        if (onReturnMethod != null || onReturnInst != null) {
            if (onReturnMethod != null && onReturnInst != null) {
                if (!onReturnMethod.isAccessible()) {
                    onReturnMethod.setAccessible(true);
                }

                Object[] args = invocation.getArguments();
                Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
                Object[] params;
                if (rParaTypes.length > 1) {
                    if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                        params = new Object[]{result, args};
                    } else {
                        params = new Object[args.length + 1];
                        params[0] = result;
                        System.arraycopy(args, 0, params, 1, args.length);
                    }
                } else {
                    params = new Object[]{result};
                }

                try {
                    //反射调用
                    onReturnMethod.invoke(onReturnInst, params);
                } catch (InvocationTargetException var10) {
                    this.fireThrowCallback(invoker, invocation, var10.getTargetException());
                } catch (Throwable var11) {
                    this.fireThrowCallback(invoker, invocation, var11);
                }

            } else {
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
            }
        }
    }

    private void fireThrowCallback(Invoker<?> invoker, Invocation invocation, Throwable exception) {
        Method onthrowMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "onthrow.method"));
        Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "onthrow.instance"));
        if (onthrowMethod != null || onthrowInst != null) {
            if (onthrowMethod != null && onthrowInst != null) {
                if (!onthrowMethod.isAccessible()) {
                    onthrowMethod.setAccessible(true);
                }

                Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
                if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
                    try {
                        Object[] args = invocation.getArguments();
                        Object[] params;
                        if (rParaTypes.length > 1) {
                            if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                                params = new Object[]{exception, args};
                            } else {
                                params = new Object[args.length + 1];
                                params[0] = exception;
                                System.arraycopy(args, 0, params, 1, args.length);
                            }
                        } else {
                            params = new Object[]{exception};
                        }

                        onthrowMethod.invoke(onthrowInst, params);
                    } catch (Throwable var9) {
                        logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), var9);
                    }
                } else {
                    logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
                }

            } else {
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
            }
        }
    }
}

原理就是 获得我们对应的配置类 反射调用

DubboInvoker

但是有个疑惑 就是异步回调怎么实现的那么我们快invoker的实现 DubboInvoker

 protected Result doInvoke(Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation)invocation;
        String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment("path", this.getUrl().getPath());
        inv.setAttachment("version", this.version);
        ExchangeClient currentClient;
        if (this.clients.length == 1) {
            currentClient = this.clients[0];
        } else {
            currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
        }

        try {
            //是否是异步
            boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
            //里面取的配置的return 配合sent使用 默认是false   <dubbo:method sent="true" return="true"   async="true"
            boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
            int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
            //可以看出return优先级大于async
            if (isOneway) {
                /**
                 *  如果设置了sent=true,表示等待网络数据发出才返回,如果sent=false,只是将待发送数据发到IO写缓存区就返回。
                 */
                boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture((Future)null);
                return new RpcResult();
            } else if (isAsync) {
                //如果是异步则上下文设置一个Future对象并返回
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture((Future)null);
                //默认实现 Future.get 所以是同步的
                return (Result)currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException var9) {
            throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var9.getMessage(), var9);
        } catch (RemotingException var10) {
            throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
        }
    }

Provider

ContextFilter

filter链条顶端 主要在当前上下文设置一些基础信息

@Activate(
        group = {"provider"},
        order = -10000
)
public class ContextFilter implements Filter {
    public ContextFilter() {
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, String> attachments = invocation.getAttachments();
        if (attachments != null) {
            //剔除一些参数
            attachments = new HashMap((Map)attachments);
            ((Map)attachments).remove("path");
            ((Map)attachments).remove("group");
            ((Map)attachments).remove("version");
            ((Map)attachments).remove("dubbo");
            ((Map)attachments).remove("token");
            ((Map)attachments).remove("timeout");
            ((Map)attachments).remove("async");
        }
        //从线程缓存获取当前线程的RpcContext 记录一些请求信息
        RpcContext.getContext().setInvoker(invoker).setInvocation(invocation).setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
        if (attachments != null) {
            //将剔除后的attachments设置到上下文
            if (RpcContext.getContext().getAttachments() != null) {
                RpcContext.getContext().getAttachments().putAll((Map)attachments);
            } else {
                RpcContext.getContext().setAttachments((Map)attachments);
            }
        }


        //设置步骤只是设置一层代理 在构造参数从url添加一些信息
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation)invocation).setInvoker(invoker);
        }

        Result var4;
        try {
            var4 = invoker.invoke(invocation);
        } finally {
            //线程缓存清除
            RpcContext.removeContext();
        }

        return var4;
    }
}

EchoFilter

回响测试主要用来检测服务是否正常(网络状态),单纯的检测网络情况的话其实不需要执行真正的业务逻辑的,所以通过Filter验证一下即可。

官方文档

consumer生成代理 强制实现了EchoService 我们强转为EchoService就能调用测试服务是否可用

/**
 * 回声测试 用来校验服务是否可用 并不执行具体逻辑
 */
@Activate(
        group = {"provider"},
        order = -110000
)
public class EchoFilter implements Filter {
    public EchoFilter() {
    }

    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        /**
         * 如果是回声测试 则只返回入参
         */
        return (Result)(inv.getMethodName().equals("$echo")
                && inv.getArguments() != null
                && inv.getArguments().length == 1 ? new RpcResult(inv.getArguments()[0]) : invoker.invoke(inv));
    }
}

ExecuteLimitFilter

例子

1.解决注解配置报错问题

参考:https://blog.csdn.net/xiao_jun_0820/article/details/81218440

 /**
     * 解决@Service注解配置parameters参数时无法将String[]转化成Map<String,String>的bug
     * @author :
     * @since*/
    @Component
    public static class ServiceParameterBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter implements PriorityOrdered {

        @Override
        public int getOrder() {
            return PriorityOrdered.LOWEST_PRECEDENCE;
        }

        @Override
        public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {
//        pvs.getPropertyValue("parameter")
            if (bean instanceof ServiceBean) {
                PropertyValue propertyValue = pvs.getPropertyValue("parameters");
                ConversionService conversionService = getConversionService();

                if (propertyValue != null && propertyValue.getValue() != null && conversionService.canConvert(propertyValue.getValue().getClass(), Map.class)) {
                    Map parameters = conversionService.convert(propertyValue.getValue(), Map.class);
                    propertyValue.setConvertedValue(parameters);
                }
            }
            return pvs;
        }

        private ConversionService getConversionService() {
            DefaultConversionService conversionService = new DefaultConversionService();
            conversionService.addConverter(new StringArrayToStringConverter());
            conversionService.addConverter(new StringArrayToMapConverter());
            return conversionService;
        }

    }

2.限流配置

/**
 * @author liqiang
 * @date 2019/10/15 19:33
 * @Description:封装暴露前端的 业务处理
 */
@Service("frontPagePromotionService")
@com.alibaba.dubbo.config.annotation.Service(
         parameters = {"prizeDraw.executes","1"}
)
public class PagePromotionServiceImpl extends AbstractBaseService implements PagePromotionService {
    /**
     * 用户根据活动抽奖
     *
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public PrizeDrawResDto prizeDraw(PrizeDrawReqDto prizeDrawReqDto) throws Exception {
        ...
    }
}

源码

/**
 * 服务提供者限流
 */
@Activate(
        group = {"provider"},
        value = {"executes"}
)
public class ExecuteLimitFilter implements Filter {
    public ExecuteLimitFilter() {
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        //获取我们配置的executes
        int max = url.getMethodParameter(methodName, "executes", 0);
        if (max > 0) {
            //获取当前请求的RpcStatus
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
            //获得信号量
            executesLimit = count.getSemaphore(max);
            //如果达到限流条件直接报错
            if (executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes="" + max + "" /> limited.");
            }
        }

        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        RpcStatus.beginCount(url, methodName);

        Result var12;
        try {
            Result result = invoker.invoke(invocation);
            var12 = result;
        } catch (Throwable var16) {
            isSuccess = false;
            if (var16 instanceof RuntimeException) {
                throw (RuntimeException)var16;
            }

            throw new RpcException("unexpected exception when ExecuteLimitFilter", var16);
        } finally {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if (acquireResult) {
                //请求完成释放限流
                executesLimit.release();
            }

        }

        return var12;
    }
}

ExceptionFilter

异常抛出规则

如果是 checked异常 则直接抛出;
如果是unchecked异常 但是在接口上有声明,也会直接抛出;
如果异常类和接口类在同一jar包里,直接抛出;
如果是 JDK自带的异常 ,直接抛出;
如果是 Dubbo的异常 ,直接抛出;
其余的都包装成RuntimeException然后抛出(避免异常在Client不能反序列化问题);

源码

@Activate(
        group = {"provider"}
)
public class ExceptionFilter implements Filter {
    private final Logger logger;

    public ExceptionFilter() {
        this(LoggerFactory.getLogger(com.alibaba.dubbo.rpc.filter.ExceptionFilter.class));
    }

    public ExceptionFilter(Logger logger) {
        this.logger = logger;
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        try {
            //执行invoke
            Result result = invoker.invoke(invocation);
            //判断是否有异常
            if (result.hasException() && GenericService.class != invoker.getInterface()) {
                try {
                    Throwable exception = result.getException();
                    //如果不是runtime异常直接返回
                    if (!(exception instanceof RuntimeException) && exception instanceof Exception) {
                        return result;
                    } else {
                        try {
                            Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                            //获得方法上面声明的异常集合
                            Class<?>[] exceptionClassses = method.getExceptionTypes();
                            Class[] arr$ = exceptionClassses;
                            int len$ = exceptionClassses.length;

                            for(int i$ = 0; i$ < len$; ++i$) {
                                Class<?> exceptionClass = arr$[i$];
                                //如果异常等于声明的异常 直接返回 注意是equals
                                if (exception.getClass().equals(exceptionClass)) {
                                    return result;
                                }
                            }
                        } catch (NoSuchMethodException var11) {
                            return result;
                        }
                        //找到这里表示并不是声明的异常 比如声明thow Exception 抛出空指针异常  才会到这里
                        this.logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
                        String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                        String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                        //如果是其他jar包的异常 则包装成Runtime异常 抛出 避免客户端序列化失败问题
                        if (serviceFile != null && exceptionFile != null && !serviceFile.equals(exceptionFile)) {
                            String className = exception.getClass().getName();
                            if (!className.startsWith("java.") && !className.startsWith("javax.")) {
                                return (Result)(exception instanceof RpcException ? result : new RpcResult(new RuntimeException(StringUtils.toString(exception))));
                            } else {
                                return result;
                            }
                        } else {
                            return result;
                        }
                    }
                } catch (Throwable var12) {
                    this.logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + var12.getClass().getName() + ": " + var12.getMessage(), var12);
                    return result;
                }
            } else {
                return result;
            }
        } catch (RuntimeException var13) {
            this.logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + var13.getClass().getName() + ": " + var13.getMessage(), var13);
            throw var13;
        }
    }
}
原文地址:https://www.cnblogs.com/LQBlog/p/12502733.html