dubbo源码阅读-Filter默认实现(十一)之ContextFilter&ConsumerContextFilter

ContextFilter

/**
 * ContextInvokerFilter
 * 用于服务提供者 排序第一
 */
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, String> attachments = invocation.getAttachments();
        // 创建新的 `attachments` 集合,清理公用的隐式参数
        if (attachments != null) {
            attachments = new HashMap<String, String>(attachments);
            attachments.remove(Constants.PATH_KEY);
            attachments.remove(Constants.GROUP_KEY);
            attachments.remove(Constants.VERSION_KEY);
            attachments.remove(Constants.DUBBO_VERSION_KEY);
            attachments.remove(Constants.TOKEN_KEY);
            attachments.remove(Constants.TIMEOUT_KEY);
            attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
        }
        //设置到Context 线程缓存 存储了服务和调用方的相关信息
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
//                .setAttachments(attachments)  // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());

        // mreged from dubbox
        // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
        if (attachments != null) {
            //避免在此之前 我们已经初始化了attachment 比如我们自定义过滤器 为-1
            if (RpcContext.getContext().getAttachments() != null) {
                RpcContext.getContext().getAttachments().putAll(attachments);
            } else {
                RpcContext.getContext().setAttachments(attachments);
            }
        }
        //为RpcInvocation设置Invoker

        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            //执行
            RpcResult result = (RpcResult) invoker.invoke(invocation);
            // pass attachments to result 返回结果增加入参的attatchments
            result.addAttachments(RpcContext.getServerContext().getAttachments());
            return result;
        } finally {
            //还原RPCContext 使用线程此技术 所以每次完毕还原
            RpcContext.removeContext();
            RpcContext.getServerContext().clearAttachments();
        }
    }
}

ConsumerContextFilter

/**
 * ConsumerContextInvokerFilter
 *应用于服务消费者
 */
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        //设置到上线文 同一个线程有效
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            RpcResult result = (RpcResult) invoker.invoke(invocation);
            RpcContext.getServerContext().setAttachments(result.getAttachments());
            return result;
        } finally {
            //清空
            RpcContext.getContext().clearAttachments();
        }
    }

}
原文地址:https://www.cnblogs.com/LQBlog/p/12505161.html