简易RPC框架-上下文

上下文

记的学英语的时候,总是不记的某个词是什么意思,然后就看不下去了,只能问周围的同学或者老师或者去查词典,他们的建议是通过上下文去推测这个词大概的意思,反正我那会上学时没理解,所以英文一直比较差。

现在英语水平也没提高多少,尽管有点领会。

后来慢慢理解了一些,因为有些词有很多种意思,放在某个场景下可能是一个意思,放在另外一个场景下又是其它的意思,这里不举例子了,上文有一定的相似度。

RPC客户端上下文

客户端由于是一个独立的环境,所以可以认为它处于一个属于自己的上下文中,与其它的端隔离。在上下文中可以指定一些公共的参数来提供给接口使用,比如:

RPC版本号
客户端请求ID
各类自定义的公共参数

RPC服务端上下文

服务端同理,也处于一个属于自己的上下文中,与其它端隔离。

RPC上下文的作用

基本线程级别的访问,让客户端或者服务端能够像访问本地变量一样访问RPC框架级别的变量。比如我们想将客户端的一个请求ID传递给服务端,这个请求ID作用于所有接口,比如RPC的调用链追踪,有两种方式:

接口中增加请求ID参数

这个方案显然是不能接受的,因为需要改的接口过多。

接口不改的情况下,在RPC框架中提供一个上下文,其中包含请求ID

这个方案显然成本最小,比如这种样调用:RpcContext.getRequestId();

  • 上图中的Context是指RPC框架级变量的一个本地副本,为了简化调用复杂度。
  • 上图中的Invocation是RPC框架级的变量,它与上面提到的Context相互配合,做到调用端与框架本身的解耦合

实现

定义上下文对象

在RpcContext对象中增加一个map类型的参数对象,可以存放任意扩展的参数。

public class RpcContext {

    private Map<String,Object> contextParameters;

    public Map<String, Object> getContextParameters() {
        return contextParameters;
    }

    public void setContextParameters(Map<String, Object> contextParameters) {
        this.contextParameters = contextParameters;
    }

    private static final ThreadLocal<RpcContext> rpcContextThreadLocal=new ThreadLocal<RpcContext>(){
        @Override
        protected RpcContext initialValue() {
            RpcContext context= new RpcContext();
            context.setContextParameters(Maps.newHashMap());
            return context;
        }
    };

    public static RpcContext getContext() {
        return rpcContextThreadLocal.get();
    }

    public static void removeContext() {
        rpcContextThreadLocal.remove();
    }

    private RpcContext() {
    }
}

RPC请求对象中增加上下文参数

RpcRequest增加如下字段,用于服务端调用。

private Map<String,Object> contextParameters;

RpcInvocation接口中增加上下文参数

在后续新增加的过滤器使用。

private Map<String,Object> contextParameters;

客户端代理

RpcProxy在组装RpcRequest对象时,从RpcContext中获取最新的参数传递给RpcReuest,从而传递给服务端。

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        RpcRequest request = new RpcRequest();
        //...

        request.setContextParameters(RpcContext.getContext().getContextParameters());

        //...
    }

客户端上下文过滤器

主要作用就是从本线线程变量中获取参数传递给RpcInvocation。

注解上的order属性文章后面详细介绍

@ActiveFilter(group = {ConstantConfig.CONSUMER},order = -1000)
public class ClientContextFilter implements RpcFilter {

    private final static Logger logger = LoggerFactory.getLogger(ClientContextFilter.class);

    @Override
    public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
        Map<String,Object> contextParameters=invocation.getContextParameters();
        if(null==contextParameters){
            contextParameters= Maps.newHashMap();
        }
        Map<String,Object> contextParametersFromRpcContext= RpcContext.getContext().getContextParameters();
        if(null!=contextParametersFromRpcContext) {
            contextParameters.putAll(contextParametersFromRpcContext);
        }
        Object rpcResponse=invoker.invoke(invocation);
        logger.info("ClientContextFilter.invoke end");
        return rpcResponse;
    }
}

服务端上下文过滤器

服务端上下文过滤器与客户端的作用相反,是从RpcInvocation中获取参数传递给本地线程变量RpcContext,后面在执行服务端方法时就可以方便的通过RpcContext获取指定变量。

@ActiveFilter(group = {ConstantConfig.PROVIDER},order = -1000)
public class ServerContextFilter implements RpcFilter {

    private final static Logger logger = LoggerFactory.getLogger(ServerContextFilter.class);

    @Override
    public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
        Map<String,Object> contextParameters=invocation.getContextParameters();
        RpcContext.getContext().setContextParameters(contextParameters);
        Object rpcResponse=invoker.invoke(invocation);
        logger.info("ServerContextFilter.invoke end");
        return rpcResponse;
    }
}

过滤器排序

因为我们的RpcContext是个本地线程变量,而且Rpc服务端是多线程处理业务,所以需要在请求结束后及时的清理掉相关本地线程变量信息。这就需要清理上下文的过滤动作在最后执行,否则有会出现服务端方法还没有执行就被清空了参数。创建一个工具类,专门用来处理获取客户端以及服务端过滤器。

过滤器注解增加排序属性

增加order字段,升级排列。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ActiveFilter {
    String[] group() default {};
    String[] value() default {};
    int order() default 999999999;
}

过滤器工具类

创建ActiveFilterUtil,包含下面两个函数。

过滤器排序函数

获取特定注解的类,然后根据注解上的排序属性升序排序。

private static List<Object> getActiveFilter(){
    List<Object> rpcFilterList= Lists.newArrayList();
    Map<String, Object> rpcFilterMapObject = ApplicationContextUtils.getApplicationContext().getBeansWithAnnotation(ActiveFilter.class);
    if (null!=rpcFilterMapObject) {
        rpcFilterList = Lists.newArrayList(rpcFilterMapObject.values());
        Collections.sort(rpcFilterList, new Comparator<Object>() {
            @Override
            public int compare(Object o1, Object o2) {
                ActiveFilter activeFilterO1 = o1.getClass().getAnnotation(ActiveFilter.class);
                ActiveFilter activeFilterO2 = o2.getClass().getAnnotation(ActiveFilter.class);
                return activeFilterO1.order() > activeFilterO2.order() ? 1 : -1;
            }
        });
    }
    return rpcFilterList;
}

获取RPC过滤器列表

提供给客户端以及服务端的一个协助方法,便于客户端以及服务构建过滤器职责链。

public static Map<String,RpcFilter> getFilterMap(boolean isServer){
    List<Object> rpcFilterList=getActiveFilter();
    Map<String,RpcFilter> filterMap=new HashMap<>();
    for (Object filterBean : rpcFilterList) {
        Class<?>[] interfaces = filterBean.getClass().getInterfaces();
        ActiveFilter activeFilter=filterBean.getClass().getAnnotation(ActiveFilter.class);
        String includeFilterGroupName=!isServer?ConstantConfig.CONSUMER:ConstantConfig.PROVIDER;
        if(null!=activeFilter.group()&& Arrays.stream(activeFilter.group()).filter(p->p.contains(includeFilterGroupName)).count()==0){
            continue;
        }
        for(Class<?> clazz:interfaces) {
            if(clazz.isAssignableFrom(RpcFilter.class)){
                filterMap.put(filterBean.getClass().getName(),(RpcFilter) filterBean);
            }
        }
    }
    return filterMap;
}

客户端以及服务端初始化

获取过滤器map的逻辑改为调用上面ActiveFilterUtil.getFilterMap方法。

使用示例

客户端设置参数

设置一个RPC版本号的参数。

@RequestMapping("/{productId}")
    public Product getById(@PathVariable final long productId) throws UnknownHostException {
        RpcContext.getContext().addContextParameter("rpc-version","1.0");
        //...
    }

服务端获取参数

简单的在服务端打印出RPC版本号。

 logger.info("get context parameter from server,rpc-version={}",String.valueOf(RpcContext.getContext().getContextParameter("rpc-version")));

输出日志如下:

本文源码

https://github.com/jiangmin168168/jim-framework

文中代码是依赖上述项目的,如果有不明白的可下载源码

原文地址:https://www.cnblogs.com/ASPNET2008/p/7749242.html