solr服务器的查询过程

SolrDispatchFilter的作用

This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml

将请求的url映射到solrconfig.xml定义的handler上。

该过滤器的doFilter方法主题:

 HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);
    try {
      Action result = call.call();
      switch (result) {
        case PASSTHROUGH:
          chain.doFilter(request, response);
          break;
        case RETRY:
          doFilter(request, response, chain, true);
          break;
        case FORWARD:
          request.getRequestDispatcher(call.getPath()).forward(request, response);
          break;
      }  
    } finally {
      call.destroy();
    }

HttpSolrCall的call方法处理这个请求:

case PROCESS:
          final Method reqMethod = Method.getMethod(req.getMethod());
          HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
          // unless we have been explicitly told not to, do cache validation
          // if we fail cache validation, execute the query
          if (config.getHttpCachingConfig().isNever304() ||
              !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
            SolrQueryResponse solrRsp = new SolrQueryResponse();
              /* even for HEAD requests, we need to execute the handler to
               * ensure we don't get an error (and to make sure the correct
               * QueryResponseWriter is selected and we get the correct
               * Content-Type)
               */
            SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp));
            execute(solrRsp);
            HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
            Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
            while (headers.hasNext()) {
              Map.Entry<String, String> entry = headers.next();
              resp.addHeader(entry.getKey(), entry.getValue());
            }
            QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);
            if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
            writeResponse(solrRsp, responseWriter, reqMethod);
          }
          return RETURN;

构造请求和响应,并调用SolrCore来处理:

public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {
    if (handler==null) {
      String msg = "Null Request Handler '" +
        req.getParams().get(CommonParams.QT) + "'";

      if (log.isWarnEnabled()) log.warn(logid + msg + ":" + req);

      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, msg);
    }

    preDecorateResponse(req, rsp);

    if (requestLog.isDebugEnabled() && rsp.getToLog().size() > 0) {
      // log request at debug in case something goes wrong and we aren't able to log later
      requestLog.debug(rsp.getToLogAsString(logid));
    }

    // TODO: this doesn't seem to be working correctly and causes problems with the example server and distrib (for example /spell)
    // if (req.getParams().getBool(ShardParams.IS_SHARD,false) && !(handler instanceof SearchHandler))
    //   throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"isShard is only acceptable with search handlers");


    handler.handleRequest(req,rsp);
    postDecorateResponse(handler, req, rsp);

    if (rsp.getToLog().size() > 0) {
      if (requestLog.isInfoEnabled()) {
        requestLog.info(rsp.getToLogAsString(logid));
      }

      if (log.isWarnEnabled() && slowQueryThresholdMillis >= 0) {
        final long qtime = (long) (req.getRequestTimer().getTime());
        if (qtime >= slowQueryThresholdMillis) {
          log.warn("slow: " + rsp.getToLogAsString(logid));
        }
      }
    }
  }

RequestHandlerBase映射到相应的handler,进行请求处理:

/**
   * Initializes the {@link org.apache.solr.request.SolrRequestHandler} by creating three {@link org.apache.solr.common.params.SolrParams} named.
   * <table border="1" summary="table of parameters">
   * <tr><th>Name</th><th>Description</th></tr>
   * <tr><td>defaults</td><td>Contains all of the named arguments contained within the list element named "defaults".</td></tr>
   * <tr><td>appends</td><td>Contains all of the named arguments contained within the list element named "appends".</td></tr>
   * <tr><td>invariants</td><td>Contains all of the named arguments contained within the list element named "invariants".</td></tr>
   * </table>
   *
   * Example:
   * <pre>
   * &lt;lst name="defaults"&gt;
   * &lt;str name="echoParams"&gt;explicit&lt;/str&gt;
   * &lt;str name="qf"&gt;text^0.5 features^1.0 name^1.2 sku^1.5 id^10.0&lt;/str&gt;
   * &lt;str name="mm"&gt;2&lt;-1 5&lt;-2 6&lt;90%&lt;/str&gt;
   * &lt;str name="bq"&gt;incubationdate_dt:[* TO NOW/DAY-1MONTH]^2.2&lt;/str&gt;
   * &lt;/lst&gt;
   * &lt;lst name="appends"&gt;
   * &lt;str name="fq"&gt;inStock:true&lt;/str&gt;
   * &lt;/lst&gt;
   *
   * &lt;lst name="invariants"&gt;
   * &lt;str name="facet.field"&gt;cat&lt;/str&gt;
   * &lt;str name="facet.field"&gt;manu_exact&lt;/str&gt;
   * &lt;str name="facet.query"&gt;price:[* TO 500]&lt;/str&gt;
   * &lt;str name="facet.query"&gt;price:[500 TO *]&lt;/str&gt;
   * &lt;/lst&gt;
   * </pre>
   *
   *
   * @param args The {@link org.apache.solr.common.util.NamedList} to initialize from
   *
   * @see #handleRequest(org.apache.solr.request.SolrQueryRequest, org.apache.solr.response.SolrQueryResponse)
   * @see #handleRequestBody(org.apache.solr.request.SolrQueryRequest, org.apache.solr.response.SolrQueryResponse)
   * @see org.apache.solr.util.SolrPluginUtils#setDefaults(org.apache.solr.request.SolrQueryRequest, org.apache.solr.common.params.SolrParams, org.apache.solr.common.params.SolrParams, org.apache.solr.common.params.SolrParams)
   * @see SolrParams#toSolrParams(org.apache.solr.common.util.NamedList)
   *
   * See also the example solrconfig.xml located in the Solr codebase (example/solr/conf).
   */
@Override
  public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
    numRequests.incrementAndGet();
    TimerContext timer = requestTimes.time();
    try {
      if(pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM)) req.getContext().put(USEPARAM,pluginInfo.attributes.get(USEPARAM));
      SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants);
      req.getContext().remove(USEPARAM);
      rsp.setHttpCaching(httpCaching);
      handleRequestBody( req, rsp );
      // count timeouts
      NamedList header = rsp.getResponseHeader();
      if(header != null) {
        Object partialResults = header.get("partialResults");
        boolean timedOut = partialResults == null ? false : (Boolean)partialResults;
        if( timedOut ) {
          numTimeouts.incrementAndGet();
          rsp.setHttpCaching(false);
        }
      }
    } catch (Exception e) {
      if (e instanceof SolrException) {
        SolrException se = (SolrException)e;
        if (se.code() == SolrException.ErrorCode.CONFLICT.code) {
          // TODO: should we allow this to be counted as an error (numErrors++)?

        } else {
          SolrException.log(SolrCore.log,e);
        }
      } else {
        SolrException.log(SolrCore.log,e);
        if (e instanceof SyntaxError) {
          e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
        }
      }

      rsp.setException(e);
      numErrors.incrementAndGet();
    }
    finally {
      timer.stop();
    }
  }

对应的handler:SearchHandler来处理body:

@Override
  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception
  {
    List<SearchComponent> components  = getComponents();
    ResponseBuilder rb = new ResponseBuilder(req, rsp, components);
    if (rb.requestInfo != null) {
      rb.requestInfo.setResponseBuilder(rb);
    }

    boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false);
    rb.setDebug(dbg);
    if (dbg == false){//if it's true, we are doing everything anyway.
      SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG), rb);
    }

    final RTimerTree timer = rb.isDebug() ? req.getRequestTimer() : null;

    final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb); // creates a ShardHandler object only if it's needed
    
    if (timer == null) {
      // non-debugging prepare phase
      for( SearchComponent c : components ) {
        c.prepare(rb);
      }
    } else {
      // debugging prepare phase
      RTimerTree subt = timer.sub( "prepare" );
      for( SearchComponent c : components ) {
        rb.setTimer( subt.sub( c.getName() ) );
        c.prepare(rb);
        rb.getTimer().stop();
      }
      subt.stop();
    }

    if (!rb.isDistrib) {
      // a normal non-distributed request

      long timeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L);
      if (timeAllowed > 0L) {
        SolrQueryTimeoutImpl.set(timeAllowed);
      }
      try {
        // The semantics of debugging vs not debugging are different enough that
        // it makes sense to have two control loops
        if(!rb.isDebug()) {
          // Process
          for( SearchComponent c : components ) {
            c.process(rb);
          }
        }
        else {
          // Process
          RTimerTree subt = timer.sub( "process" );
          for( SearchComponent c : components ) {
            rb.setTimer( subt.sub( c.getName() ) );
            c.process(rb);
            rb.getTimer().stop();
          }
          subt.stop();

          // add the timing info
          if (rb.isDebugTimings()) {
            rb.addDebugInfo("timing", timer.asNamedList() );
          }
        }
      } catch (ExitableDirectoryReader.ExitingReaderException ex) {
        log.warn( "Query: " + req.getParamString() + "; " + ex.getMessage());
        SolrDocumentList r = (SolrDocumentList) rb.rsp.getValues().get("response");
        if(r == null)
          r = new SolrDocumentList();
        r.setNumFound(0);
        rb.rsp.add("response", r);
        if(rb.isDebug()) {
          NamedList debug = new NamedList();
          debug.add("explain", new NamedList());
          rb.rsp.add("debug", debug);
        }
        rb.rsp.getResponseHeader().add("partialResults", Boolean.TRUE);
      } finally {
        SolrQueryTimeoutImpl.reset();
      }
    } else {
      // a distributed request

      if (rb.outgoing == null) {
        rb.outgoing = new LinkedList<>();
      }
      rb.finished = new ArrayList<>();

      int nextStage = 0;
      do {
        rb.stage = nextStage;
        nextStage = ResponseBuilder.STAGE_DONE;

        // call all components
        for( SearchComponent c : components ) {
          // the next stage is the minimum of what all components report
          nextStage = Math.min(nextStage, c.distributedProcess(rb));
        }


        // check the outgoing queue and send requests
        while (rb.outgoing.size() > 0) {

          // submit all current request tasks at once
          while (rb.outgoing.size() > 0) {
            ShardRequest sreq = rb.outgoing.remove(0);
            sreq.actualShards = sreq.shards;
            if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
              sreq.actualShards = rb.shards;
            }
            sreq.responses = new ArrayList<>(sreq.actualShards.length); // presume we'll get a response from each shard we send to

            // TODO: map from shard to address[]
            for (String shard : sreq.actualShards) {
              ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
              params.remove(ShardParams.SHARDS);      // not a top-level request
              params.set(CommonParams.DISTRIB, "false");               // not a top-level request
              params.remove("indent");
              params.remove(CommonParams.HEADER_ECHO_PARAMS);
              params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
              params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose);
              params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was asked
              if (rb.requestInfo != null) {
                // we could try and detect when this is needed, but it could be tricky
                params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime()));
              }
              String shardQt = params.get(ShardParams.SHARDS_QT);
              if (shardQt != null) {
                params.set(CommonParams.QT, shardQt);
              } else {
                // for distributed queries that don't include shards.qt, use the original path
                // as the default but operators need to update their luceneMatchVersion to enable
                // this behavior since it did not work this way prior to 5.1
                if (req.getCore().getSolrConfig().luceneMatchVersion.onOrAfter(Version.LUCENE_5_1_0)) {
                  String reqPath = (String) req.getContext().get(PATH);
                  if (!"/select".equals(reqPath)) {
                    params.set(CommonParams.QT, reqPath);
                  } // else if path is /select, then the qt gets passed thru if set
                } else {
                  // this is the pre-5.1 behavior, which translates to sending the shard request to /select
                  params.remove(CommonParams.QT);
                }
              }
              shardHandler1.submit(sreq, shard, params);
            }
          }


          // now wait for replies, but if anyone puts more requests on
          // the outgoing queue, send them out immediately (by exiting
          // this loop)
          boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false);
          while (rb.outgoing.size() == 0) {
            ShardResponse srsp = tolerant ? 
                shardHandler1.takeCompletedIncludingErrors():
                shardHandler1.takeCompletedOrError();
            if (srsp == null) break;  // no more requests to wait for

            // Was there an exception?  
            if (srsp.getException() != null) {
              // If things are not tolerant, abort everything and rethrow
              if(!tolerant) {
                shardHandler1.cancelAll();
                if (srsp.getException() instanceof SolrException) {
                  throw (SolrException)srsp.getException();
                } else {
                  throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());
                }
              } else {
                if(rsp.getResponseHeader().get("partialResults") == null) {
                  rsp.getResponseHeader().add("partialResults", Boolean.TRUE);
                }
              }
            }

            rb.finished.add(srsp.getShardRequest());

            // let the components see the responses to the request
            for(SearchComponent c : components) {
              c.handleResponses(rb, srsp.getShardRequest());
            }
          }
        }

        for(SearchComponent c : components) {
          c.finishStage(rb);
        }

        // we are done when the next stage is MAX_VALUE
      } while (nextStage != Integer.MAX_VALUE);
    }
    
    // SOLR-5550: still provide shards.info if requested even for a short circuited distrib request
    if(!rb.isDistrib && req.getParams().getBool(ShardParams.SHARDS_INFO, false) && rb.shortCircuitedURL != null) {  
      NamedList<Object> shardInfo = new SimpleOrderedMap<Object>();
      SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>();        
      if (rsp.getException() != null) {
        Throwable cause = rsp.getException();
        if (cause instanceof SolrServerException) {
          cause = ((SolrServerException)cause).getRootCause();
        } else {
          if (cause.getCause() != null) {
            cause = cause.getCause();
          }          
        }
        nl.add("error", cause.toString() );
        StringWriter trace = new StringWriter();
        cause.printStackTrace(new PrintWriter(trace));
        nl.add("trace", trace.toString() );
      }
      else {
        nl.add("numFound", rb.getResults().docList.matches());
        nl.add("maxScore", rb.getResults().docList.maxScore());
      }
      nl.add("shardAddress", rb.shortCircuitedURL);
      nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this request so far
      
      int pos = rb.shortCircuitedURL.indexOf("://");        
      String shardInfoName = pos != -1 ? rb.shortCircuitedURL.substring(pos+3) : rb.shortCircuitedURL;
      shardInfo.add(shardInfoName, nl);   
      rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo);            
    }
  }

1. 单机QueryComponent处理请求:

/**
   * Actually run the query
   */
  @Override
  public void process(ResponseBuilder rb) throws IOException
  {
    LOG.debug("process: {}", rb.req.getParams());
  
    SolrQueryRequest req = rb.req;
    SolrParams params = req.getParams();
    if (!params.getBool(COMPONENT_NAME, true)) {
      return;
    }
    SolrIndexSearcher searcher = req.getSearcher();

    StatsCache statsCache = req.getCore().getStatsCache();
    
    int purpose = params.getInt(ShardParams.SHARDS_PURPOSE, ShardRequest.PURPOSE_GET_TOP_IDS);
    if ((purpose & ShardRequest.PURPOSE_GET_TERM_STATS) != 0) {
      statsCache.returnLocalStats(rb, searcher);
      return;
    }
    // check if we need to update the local copy of global dfs
    if ((purpose & ShardRequest.PURPOSE_SET_TERM_STATS) != 0) {
      // retrieve from request and update local cache
      statsCache.receiveGlobalStats(req);
    }

    SolrQueryResponse rsp = rb.rsp;
    IndexSchema schema = searcher.getSchema();

    // Optional: This could also be implemented by the top-level searcher sending
    // a filter that lists the ids... that would be transparent to
    // the request handler, but would be more expensive (and would preserve score
    // too if desired).
    String ids = params.get(ShardParams.IDS);
    if (ids != null) {
      SchemaField idField = schema.getUniqueKeyField();
      List<String> idArr = StrUtils.splitSmart(ids, ",", true);
      int[] luceneIds = new int[idArr.size()];
      int docs = 0;
      for (int i=0; i<idArr.size(); i++) {
        int id = searcher.getFirstMatch(
                new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));
        if (id >= 0)
          luceneIds[docs++] = id;
      }

      DocListAndSet res = new DocListAndSet();
      res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);
      if (rb.isNeedDocSet()) {
        // TODO: create a cache for this!
        List<Query> queries = new ArrayList<>();
        queries.add(rb.getQuery());
        List<Query> filters = rb.getFilters();
        if (filters != null) queries.addAll(filters);
        res.docSet = searcher.getDocSet(queries);
      }
      rb.setResults(res);

      ResultContext ctx = new ResultContext();
      ctx.docs = rb.getResults().docList;
      ctx.query = null; // anything?
      rsp.add("response", ctx);
      return;
    }

    // -1 as flag if not set.
    long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED, -1L);
    if (null != rb.getCursorMark() && 0 < timeAllowed) {
      // fundamentally incompatible
      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can not search using both " +
                              CursorMarkParams.CURSOR_MARK_PARAM + " and " + CommonParams.TIME_ALLOWED);
    }

    SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
    cmd.setTimeAllowed(timeAllowed);

    req.getContext().put(SolrIndexSearcher.STATS_SOURCE, statsCache.get(req));
    
    SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();

    //
    // grouping / field collapsing
    //
    GroupingSpecification groupingSpec = rb.getGroupingSpec();
    if (groupingSpec != null) {
      try {
        boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0;
        if (params.getBool(GroupParams.GROUP_DISTRIBUTED_FIRST, false)) {
          CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder()
              .setQueryCommand(cmd)
              .setNeedDocSet(false) // Order matters here
              .setIncludeHitCount(true)
              .setSearcher(searcher);

          for (String field : groupingSpec.getFields()) {
            topsGroupsActionBuilder.addCommandField(new SearchGroupsFieldCommand.Builder()
                .setField(schema.getField(field))
                .setGroupSort(groupingSpec.getGroupSort())
                .setTopNGroups(cmd.getOffset() + cmd.getLen())
                .setIncludeGroupCount(groupingSpec.isIncludeGroupCount())
                .build()
            );
          }

          CommandHandler commandHandler = topsGroupsActionBuilder.build();
          commandHandler.execute();
          SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(searcher);
          rsp.add("firstPhase", commandHandler.processResult(result, serializer));
          rsp.add("totalHitCount", commandHandler.getTotalHitCount());
          rb.setResult(result);
          return;
        } else if (params.getBool(GroupParams.GROUP_DISTRIBUTED_SECOND, false)) {
          CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder()
              .setQueryCommand(cmd)
              .setTruncateGroups(groupingSpec.isTruncateGroups() && groupingSpec.getFields().length > 0)
              .setSearcher(searcher);

          for (String field : groupingSpec.getFields()) {
            SchemaField schemaField = schema.getField(field);
            String[] topGroupsParam = params.getParams(GroupParams.GROUP_DISTRIBUTED_TOPGROUPS_PREFIX + field);
            if (topGroupsParam == null) {
              topGroupsParam = new String[0];
            }

            List<SearchGroup<BytesRef>> topGroups = new ArrayList<>(topGroupsParam.length);
            for (String topGroup : topGroupsParam) {
              SearchGroup<BytesRef> searchGroup = new SearchGroup<>();
              if (!topGroup.equals(TopGroupsShardRequestFactory.GROUP_NULL_VALUE)) {
                searchGroup.groupValue = new BytesRef(schemaField.getType().readableToIndexed(topGroup));
              }
              topGroups.add(searchGroup);
            }

            secondPhaseBuilder.addCommandField(
                new TopGroupsFieldCommand.Builder()
                    .setField(schemaField)
                    .setGroupSort(groupingSpec.getGroupSort())
                    .setSortWithinGroup(groupingSpec.getSortWithinGroup())
                    .setFirstPhaseGroups(topGroups)
                    .setMaxDocPerGroup(groupingSpec.getGroupOffset() + groupingSpec.getGroupLimit())
                    .setNeedScores(needScores)
                    .setNeedMaxScore(needScores)
                    .build()
            );
          }

          for (String query : groupingSpec.getQueries()) {
            secondPhaseBuilder.addCommandField(new QueryCommand.Builder()
                .setDocsToCollect(groupingSpec.getOffset() + groupingSpec.getLimit())
                .setSort(groupingSpec.getGroupSort())
                .setQuery(query, rb.req)
                .setDocSet(searcher)
                .build()
            );
          }

          CommandHandler commandHandler = secondPhaseBuilder.build();
          commandHandler.execute();
          TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb);
          rsp.add("secondPhase", commandHandler.processResult(result, serializer));
          rb.setResult(result);
          return;
        }

        int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0);
        boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100;
        Grouping.TotalCount defaultTotalCount = groupingSpec.isIncludeGroupCount() ?
            Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped;
        int limitDefault = cmd.getLen(); // this is normally from "rows"
        Grouping grouping =
            new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, groupingSpec.isMain());
        grouping.setSort(groupingSpec.getGroupSort())
            .setGroupSort(groupingSpec.getSortWithinGroup())
            .setDefaultFormat(groupingSpec.getResponseFormat())
            .setLimitDefault(limitDefault)
            .setDefaultTotalCount(defaultTotalCount)
            .setDocsPerGroupDefault(groupingSpec.getGroupLimit())
            .setGroupOffsetDefault(groupingSpec.getGroupOffset())
            .setGetGroupedDocSet(groupingSpec.isTruncateGroups());

        if (groupingSpec.getFields() != null) {
          for (String field : groupingSpec.getFields()) {
            grouping.addFieldCommand(field, rb.req);
          }
        }

        if (groupingSpec.getFunctions() != null) {
          for (String groupByStr : groupingSpec.getFunctions()) {
            grouping.addFunctionCommand(groupByStr, rb.req);
          }
        }

        if (groupingSpec.getQueries() != null) {
          for (String groupByStr : groupingSpec.getQueries()) {
            grouping.addQueryCommand(groupByStr, rb.req);
          }
        }

        if (rb.doHighlights || rb.isDebug() || params.getBool(MoreLikeThisParams.MLT, false)) {
          // we need a single list of the returned docs
          cmd.setFlags(SolrIndexSearcher.GET_DOCLIST);
        }

        grouping.execute();
        if (grouping.isSignalCacheWarning()) {
          rsp.add(
              "cacheWarning",
              String.format(Locale.ROOT, "Cache limit of %d percent relative to maxdoc has exceeded. Please increase cache size or disable caching.", maxDocsPercentageToCache)
          );
        }
        rb.setResult(result);

        if (grouping.mainResult != null) {
          ResultContext ctx = new ResultContext();
          ctx.docs = grouping.mainResult;
          ctx.query = null; // TODO? add the query?
          rsp.add("response", ctx);
          rsp.getToLog().add("hits", grouping.mainResult.matches());
        } else if (!grouping.getCommands().isEmpty()) { // Can never be empty since grouping.execute() checks for this.
          rsp.add("grouped", result.groupedResults);
          rsp.getToLog().add("hits", grouping.getCommands().get(0).getMatches());
        }
        return;
      } catch (SyntaxError e) {
        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
      }
    }

    // normal search result
    searcher.search(result, cmd);
    rb.setResult(result);

    ResultContext ctx = new ResultContext();
    ctx.docs = rb.getResults().docList;
    ctx.query = rb.getQuery();
    rsp.add("response", ctx);
    rsp.getToLog().add("hits", rb.getResults().docList.matches());

    if ( ! rb.req.getParams().getBool(ShardParams.IS_SHARD,false) ) {
      if (null != rb.getNextCursorMark()) {
        rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT,
                   rb.getNextCursorMark().getSerializedTotem());
      }
    }

    if(rb.mergeFieldHandler != null) {
      rb.mergeFieldHandler.handleMergeFields(rb, searcher);
    } else {
      doFieldSortValues(rb, searcher);
    }

    doPrefetch(rb);
  }

调用SolrIndexSearcher:基于lucence IndexSearcher之上加入缓存功能和schema aware

SolrIndexSearcher adds schema awareness and caching functionality
 over the lucene IndexSearcher
  public QueryResult search(QueryResult qr, QueryCommand cmd) throws IOException {
    getDocListC(qr,cmd);
    return qr;
  }
private void getDocListNC(QueryResult qr,QueryCommand cmd) throws IOException {
    int len = cmd.getSupersetMaxDoc();
    int last = len;
    if (last < 0 || last > maxDoc()) last=maxDoc();
    final int lastDocRequested = last;
    int nDocsReturned;
    int totalHits;
    float maxScore;
    int[] ids;
    float[] scores;

    boolean needScores = (cmd.getFlags() & GET_SCORES) != 0;
    
    Query query = QueryUtils.makeQueryable(cmd.getQuery());

    ProcessedFilter pf = getProcessedFilter(cmd.getFilter(), cmd.getFilterList());
    if (pf.filter != null) {
      query = new FilteredQuery(query, pf.filter);
    }

    // handle zero case...
    if (lastDocRequested<=0) {
      final float[] topscore = new float[] { Float.NEGATIVE_INFINITY };
      final int[] numHits = new int[1];

      Collector collector;

      if (!needScores) {
        collector = new SimpleCollector () {
          @Override
          public void collect(int doc) {
            numHits[0]++;
          }
          
          @Override
          public boolean needsScores() {
            return false;
          }
        };
      } else {
        collector = new SimpleCollector() {
          Scorer scorer;
          @Override
          public void setScorer(Scorer scorer) {
            this.scorer = scorer;
          }
          @Override
          public void collect(int doc) throws IOException {
            numHits[0]++;
            float score = scorer.score();
            if (score > topscore[0]) topscore[0]=score;            
          }
          
          @Override
          public boolean needsScores() {
            return true;
          }
        };
      }
      
      buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);

      nDocsReturned=0;
      ids = new int[nDocsReturned];
      scores = new float[nDocsReturned];
      totalHits = numHits[0];
      maxScore = totalHits>0 ? topscore[0] : 0.0f;
      // no docs on this page, so cursor doesn't change
      qr.setNextCursorMark(cmd.getCursorMark());
    } else {
      final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd);
      Collector collector = topCollector;
      buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);

      totalHits = topCollector.getTotalHits();
      TopDocs topDocs = topCollector.topDocs(0, len);
      populateNextCursorMarkFromTopDocs(qr, cmd, topDocs);

      maxScore = totalHits>0 ? topDocs.getMaxScore() : 0.0f;
      nDocsReturned = topDocs.scoreDocs.length;
      ids = new int[nDocsReturned];
      scores = (cmd.getFlags()&GET_SCORES)!=0 ? new float[nDocsReturned] : null;
      for (int i=0; i<nDocsReturned; i++) {
        ScoreDoc scoreDoc = topDocs.scoreDocs[i];
        ids[i] = scoreDoc.doc;
        if (scores != null) scores[i] = scoreDoc.score;
      }
    }

    int sliceLen = Math.min(lastDocRequested,nDocsReturned);
    if (sliceLen < 0) sliceLen=0;
    qr.setDocList(new DocSlice(0,sliceLen,ids,scores,totalHits,maxScore));
  }

2. 集群查找

HttpShardHandler调用:

@Override
  public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
    // do this outside of the callable for thread safety reasons
    final List<String> urls = getURLs(sreq, shard);

    Callable<ShardResponse> task = new Callable<ShardResponse>() {
      @Override
      public ShardResponse call() throws Exception {

        ShardResponse srsp = new ShardResponse();
        if (sreq.nodeName != null) {
          srsp.setNodeName(sreq.nodeName);
        }
        srsp.setShardRequest(sreq);
        srsp.setShard(shard);
        SimpleSolrResponse ssr = new SimpleSolrResponse();
        srsp.setSolrResponse(ssr);
        long startTime = System.nanoTime();

        try {
          params.remove(CommonParams.WT); // use default (currently javabin)
          params.remove(CommonParams.VERSION);

          QueryRequest req = makeQueryRequest(sreq, params, shard);
          req.setMethod(SolrRequest.METHOD.POST);

          // no need to set the response parser as binary is the default
          // req.setResponseParser(new BinaryResponseParser());

          // if there are no shards available for a slice, urls.size()==0
          if (urls.size()==0) {
            // TODO: what's the right error code here? We should use the same thing when
            // all of the servers for a shard are down.
            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
          }

          if (urls.size() <= 1) {
            String url = urls.get(0);
            srsp.setShardAddress(url);
            try (SolrClient client = new HttpSolrClient(url, httpClient)) {
              ssr.nl = client.request(req);
            }
          } else {
            LBHttpSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
            ssr.nl = rsp.getResponse();
            srsp.setShardAddress(rsp.getServer());
          }
        }
        catch( ConnectException cex ) {
          srsp.setException(cex); //????
        } catch (Exception th) {
          srsp.setException(th);
          if (th instanceof SolrException) {
            srsp.setResponseCode(((SolrException)th).code());
          } else {
            srsp.setResponseCode(-1);
          }
        }

        ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

        return transfomResponse(sreq, srsp, shard);
      }
    };

    try {
      if (shard != null)  {
        MDC.put("ShardRequest.shards", shard);
      }
      if (urls != null && !urls.isEmpty())  {
        MDC.put("ShardRequest.urlList", urls.toString());
      }
      pending.add( completionService.submit(task) );
    } finally {
      MDC.remove("ShardRequest.shards");
      MDC.remove("ShardRequest.urlList");
    }
  }

调用LBHttpSolrClient处理:

该类的功能说明:

/**
 * LBHttpSolrClient or "LoadBalanced HttpSolrClient" is a load balancing wrapper around
 * {@link HttpSolrClient}. This is useful when you
 * have multiple Solr servers and the requests need to be Load Balanced among them.
 *
 * Do <b>NOT</b> use this class for indexing in master/slave scenarios since documents must be sent to the
 * correct master; no inter-node routing is done.
 *
 * In SolrCloud (leader/replica) scenarios, it is usually better to use
 * {@link CloudSolrClient}, but this class may be used
 * for updates because the server will forward them to the appropriate leader.
 *
 * <p>
 * It offers automatic failover when a server goes down and it detects when the server comes back up.
 * <p>
 * Load balancing is done using a simple round-robin on the list of servers.
 * <p>
 * If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken
 * off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.
 * This process is continued till it tries all the live servers. If at least one server is alive, the request succeeds,
 * and if not it fails.
 * <blockquote><pre>
 * SolrClient lbHttpSolrClient = new LBHttpSolrClient("http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");
 * //or if you wish to pass the HttpClient do as follows
 * httpClient httpClient = new HttpClient();
 * SolrClient lbHttpSolrClient = new LBHttpSolrClient(httpClient, "http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");
 * </pre></blockquote>
 * This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread.
 * This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.
 * <p>
 * <b>When to use this?</b><br> This can be used as a software load balancer when you do not wish to setup an external
 * load balancer. Alternatives to this code are to use
 * a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a
 * href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
 *
 * @since solr 1.4
 */

具体的执行方法如下:

  /**
   * Makes a request to one or more of the given urls, using the configured load balancer.
   *
   * @param req The solr search request that should be sent through the load balancer
   * @param urls The list of solr server urls to load balance across
   * @return The response from the request
   */
  public LBHttpSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)
    throws SolrServerException, IOException {
    return loadbalancer.request(new LBHttpSolrClient.Req(req, urls));
  }

调用request处理请求:

 /**
   * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
   * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
   * time, or until a test request on that server succeeds.
   *
   * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
   * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
   * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
   *
   * If no live servers are found a SolrServerException is thrown.
   *
   * @param req contains both the request as well as the list of servers to query
   *
   * @return the result of the request
   *
   * @throws IOException If there is a low-level I/O error.
   */
  public Rsp request(Req req) throws SolrServerException, IOException {
    Rsp rsp = new Rsp();
    Exception ex = null;
    boolean isUpdate = req.request instanceof IsUpdateRequest;
    List<ServerWrapper> skipped = null;

    long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
    long timeOutTime = System.nanoTime() + timeAllowedNano;
    for (String serverStr : req.getServers()) {
      if(isTimeExceeded(timeAllowedNano, timeOutTime)) {
        break;
      }
      
      serverStr = normalize(serverStr);
      // if the server is currently a zombie, just skip to the next one
      ServerWrapper wrapper = zombieServers.get(serverStr);
      if (wrapper != null) {
        // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
        final int numDeadServersToTry = req.getNumDeadServersToTry();
        if (numDeadServersToTry > 0) {
          if (skipped == null) {
            skipped = new ArrayList<>(numDeadServersToTry);
            skipped.add(wrapper);
          }
          else if (skipped.size() < numDeadServersToTry) {
            skipped.add(wrapper);
          }
        }
        continue;
      }
      rsp.server = serverStr;
      try {
        MDC.put("LBHttpSolrClient.url", serverStr);
        HttpSolrClient client = makeSolrClient(serverStr);

        ex = doRequest(client, req, rsp, isUpdate, false, null);
        if (ex == null) {
          return rsp; // SUCCESS
        }
      } finally {
        MDC.remove("LBHttpSolrClient.url");
      }
    }

    // try the servers we previously skipped
    if (skipped != null) {
      for (ServerWrapper wrapper : skipped) {
        if(isTimeExceeded(timeAllowedNano, timeOutTime)) {
          break;
        }

        ex = doRequest(wrapper.client, req, rsp, isUpdate, true, wrapper.getKey());
        if (ex == null) {
          return rsp; // SUCCESS
        }
      }
    }


    if (ex == null) {
      throw new SolrServerException("No live SolrServers available to handle this request");
    } else {
      throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
    }

  }
原文地址:https://www.cnblogs.com/davidwang456/p/4988775.html