elasticsearch之请求处理流程(Rest/RPC )

.Action概述

 ES提供client供集群节点或java客户端访问集群用。client模块通过代理模式,将所有的操作都集成到client接口中。这样外部调用只需要初始化client就能够完成所有的调用功能。在每个方法后面都有一个***action来承接相应的功能。elasticsearch中的绝大部分操作都是通过相应的action,这些action在action包中。它的结构如下图所示:

上图是action包的部分截图,这里面对应着各个功能的action。各个action的包也都非常类似于index。这些action的实现也非常类似,都是基础自action,下图是indexaction的继承关系:

因为这些action并未真正实现相应的功能,只是一个代理,因此实现上也非常简单。他们的主要作用是提供新建response和request的方法及对应的action名称。还拿indexaction为例,它的方法图如下所示:

可以看到它只是提供了两个新建response和request的方法,及一个字NAME字段,这个NAME字段会用于后面action调用中。每个action对应的功能实现是在对应的transportAction中。

实际上***action也并非是真正的功能实现者,它只是一个代理,它的真正实现者是transport***Action.在ES中,Transport*Action 是比较核心的类集合。这里至少有两组映射关系。

Action -> Transport*Action
Transport*Action -> *TransportHandler

对应的功能是,可以通过Action 找到对应的TransportAction,这些TransportAction 如果是query类,则会调用SearchServiceTransportAction,并且通过第二层映射找到对应的Handler,否则可能就直接通过对应的Service完成操作。

第一层映射关系由类似下面的代码在ActionModule中完成:

第二层映射目前看来只有在查询相关的功能才有,其他的Transport*Action 则只调用对应的Service 来完成实际的操作。类似 SearchServiceTransportAction ,可以看做是SearchService进一步封装。如下:

2.rest请求到action的映射

对于java的Client请求,可以直接找到对应的action请求,但是ES为了提供更为通用的restful请求,通过restControl进行了由http到action的映射。

首先,每一个action在初始化时,针对自身能处理的action向restController进行注册,以RestCreateIndexAction为例,

    public RestCreateIndexAction(Settings settings, RestController controller, Client client) {
        super(settings, controller, client);
        controller.registerHandler(RestRequest.Method.PUT, "/{index}", this);
        controller.registerHandler(RestRequest.Method.POST, "/{index}", this);
    }

restController维护了几个队列,用来处理不同类型的http请求。

 /**
     * Registers a rest handler to be execute when the provided method and path match the request.
     */
    public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
        switch (method) {
            case GET:
                getHandlers.insert(path, handler);
                break;
            case DELETE:
                deleteHandlers.insert(path, handler);
                break;
            case POST:
                postHandlers.insert(path, handler);
                break;
            case PUT:
                putHandlers.insert(path, handler);
                break;
            case OPTIONS:
                optionsHandlers.insert(path, handler);
                break;
            case HEAD:
                headHandlers.insert(path, handler);
                break;
            default:
                throw new IllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
        }
    }

 当httpServer收到请求时候,优先进行plugin处理,然后转发给 RestController来处理。

    public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {
        String rawPath = request.rawPath();
        if (rawPath.startsWith("/_plugin/")) {
            RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);
            filterChain.continueProcessing(request, channel);
            return;
        } else if (rawPath.equals("/favicon.ico")) {
            handleFavicon(request, channel);
            return;
        }
        restController.dispatchRequest(request, channel);
    }

 RestController收到请求后,如果有filter,先进行r过滤。然后进行executeHandler,最后通过channel返回操作结果

void executeHandler(RestRequest request, RestChannel channel) throws Exception {
        final RestHandler handler = getHandler(request);
        if (handler != null) {
            handler.handleRequest(request, channel); //根据action注册的path,调用对应action的handleRequest方法,在该方法内通过channel返回操作结果
} else { if (request.method() == RestRequest.Method.OPTIONS) { // when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added) channel.sendResponse(new BytesRestResponse(OK)); } else { channel.sendResponse(new BytesRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]")); } } }
原文地址:https://www.cnblogs.com/wzj4858/p/8124780.html