solrCloud源码分析之CloudSolrClient

CloudSolrClient是solrj提供的客户端与solrCloud交互的类。该类的实例与zookeeper进行通信来确定solrCloud collections中的solr endpoint,然后使用LBHttpSolrClient发送请求。

CloudSolrClient查询简单代码:

import java.io.IOException;

import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.XMLResponseParser;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocumentList;

public class SolrJCloudClientSearch {

    public static void main(String[] args) throws SolrServerException, IOException {
        String zkHost = "127.0.0.1:9983";
        CloudSolrClient server = new CloudSolrClient(zkHost);
        //server = new CloudSolrClient(zkHost);
        server.setParser(new XMLResponseParser());
        SolrQuery parameters = new SolrQuery();
        parameters.set("q", "*:*");
        parameters.set("qt", "/select");        
        parameters.set("collection", "Test");
        QueryResponse response = server.query(parameters);
        SolrDocumentList list = response.getResults();
        System.out.println(list.size());    
    }
}

查询时流程

1. 调用方法query:

  /**
   * Performs a query to the Solr server
   *
   * @param params  an object holding all key/value parameters to send along the request
   *
   * @return a {@link org.apache.solr.client.solrj.response.QueryResponse} containing the response
   *         from the server
   *
   * @throws IOException If there is a low-level I/O error.
   * @throws SolrServerException if there is an error on the server
   */
  public QueryResponse query(SolrParams params) throws SolrServerException, IOException {
    return query(null, params);
  }

接着调用query方法

  /**
   * Performs a query to the Solr server
   *
   * @param collection the Solr collection to query
   * @param params  an object holding all key/value parameters to send along the request
   *
   * @return a {@link org.apache.solr.client.solrj.response.QueryResponse} containing the response
   *         from the server
   *
   * @throws IOException If there is a low-level I/O error.
   * @throws SolrServerException if there is an error on the server
   */
  public QueryResponse query(String collection, SolrParams params) throws SolrServerException, IOException {
    return new QueryRequest(params).process(this, collection);
  }

/**
   * Send this request to a {@link SolrClient} and return the response
   *
   * @param client the SolrClient to communicate with
   * @param collection the collection to execute the request against
   *
   * @return the response
   *
   * @throws SolrServerException if there is an error on the Solr server
   * @throws IOException if there is a communication error
   */
  public final T process(SolrClient client, String collection) throws SolrServerException, IOException {
    long startTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
    T res = createResponse(client);
    res.setResponse(client.request(this, collection));
    long endTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
    res.setElapsedTime(endTime - startTime);
    return res;
  }

调用客户端CloudSolrClient的request方法:

 @Override
  public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
    SolrParams reqParams = request.getParams();

    if (collection == null)
      collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
    return requestWithRetryOnStaleState(request, 0, collection);
  }

完整的客户端调用过程如下:

 /**
   * As this class doesn't watch external collections on the client side,
   * there's a chance that the request will fail due to cached stale state,
   * which means the state must be refreshed from ZK and retried.
   */
  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
      throws SolrServerException, IOException {

    connect(); // important to call this before you start working with the ZkStateReader

    // build up a _stateVer_ param to pass to the server containing all of the
    // external collection state versions involved in this request, which allows
    // the server to notify us that our cached state for one or more of the external
    // collections is stale and needs to be refreshed ... this code has no impact on internal collections
    String stateVerParam = null;
    List<DocCollection> requestedCollections = null;
    boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
    if (collection != null &&  !isAdmin) { // don't do _stateVer_ checking for admin requests
      Set<String> requestedCollectionNames = getCollectionNames(getZkStateReader().getClusterState(), collection);

      StringBuilder stateVerParamBuilder = null;
      for (String requestedCollection : requestedCollectionNames) {
        // track the version of state we're using on the client side using the _stateVer_ param
        DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null);
        int collVer = coll.getZNodeVersion();
        if (coll.getStateFormat()>1) {
          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
          requestedCollections.add(coll);

          if (stateVerParamBuilder == null) {
            stateVerParamBuilder = new StringBuilder();
          } else {
            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
          }

          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
        }
      }

      if (stateVerParamBuilder != null) {
        stateVerParam = stateVerParamBuilder.toString();
      }
    }

    if (request.getParams() instanceof ModifiableSolrParams) {
      ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
      if (stateVerParam != null) {
        params.set(STATE_VERSION, stateVerParam);
      } else {
        params.remove(STATE_VERSION);
      }
    } // else: ??? how to set this ???

    NamedList<Object> resp = null;
    try {
      resp = sendRequest(request, collection);
      //to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from there
      Object o = resp.get(STATE_VERSION, resp.size()-1);
      if(o != null && o instanceof Map) {
        //remove this because no one else needs this and tests would fail if they are comparing responses
        resp.remove(resp.size()-1);
        Map invalidStates = (Map) o;
        for (Object invalidEntries : invalidStates.entrySet()) {
          Map.Entry e = (Map.Entry) invalidEntries;
          getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue());
        }

      }
    } catch (Exception exc) {

      Throwable rootCause = SolrException.getRootCause(exc);
      // don't do retry support for admin requests or if the request doesn't have a collection specified
      if (collection == null || isAdmin) {
        if (exc instanceof SolrServerException) {
          throw (SolrServerException)exc;
        } else if (exc instanceof IOException) {
          throw (IOException)exc;
        }else if (exc instanceof RuntimeException) {
          throw (RuntimeException) exc;
        }
        else {
          throw new SolrServerException(rootCause);
        }
      }

      int errorCode = (rootCause instanceof SolrException) ?
          ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;

      log.error("Request to collection {} failed due to ("+errorCode+
          ") {}, retry? "+retryCount, collection, rootCause.toString());

      boolean wasCommError =
          (rootCause instanceof ConnectException ||
              rootCause instanceof ConnectTimeoutException ||
              rootCause instanceof NoHttpResponseException ||
              rootCause instanceof SocketException);

      boolean stateWasStale = false;
      if (retryCount < MAX_STALE_RETRIES  &&
          requestedCollections != null    &&
          !requestedCollections.isEmpty() &&
          SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
      {
        // cached state for one or more external collections was stale
        // re-issue request using updated state
        stateWasStale = true;

        // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
        for (DocCollection ext : requestedCollections) {
          collectionStateCache.remove(ext.getName());
        }
      }

      // if we experienced a communication error, it's worth checking the state
      // with ZK just to make sure the node we're trying to hit is still part of the collection
      if (retryCount < MAX_STALE_RETRIES &&
          !stateWasStale &&
          requestedCollections != null &&
          !requestedCollections.isEmpty() &&
          wasCommError) {
        for (DocCollection ext : requestedCollections) {
          DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null);
          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
            // looks like we couldn't reach the server because the state was stale == retry
            stateWasStale = true;
            // we just pulled state from ZK, so update the cache so that the retry uses it
            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
          }
        }
      }

      if (requestedCollections != null) {
        requestedCollections.clear(); // done with this
      }

      // if the state was stale, then we retry the request once with new state pulled from Zk
      if (stateWasStale) {
        log.warn("Re-trying request to  collection(s) "+collection+" after stale state error from server.");
        resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
      } else {
        if(exc instanceof SolrException) {
          throw exc;
        } if (exc instanceof SolrServerException) {
          throw (SolrServerException)exc;
        } else if (exc instanceof IOException) {
          throw (IOException)exc;
        } else {
          throw new SolrServerException(rootCause);
        }
      }
    }

    return resp;
  }

上面的代码看着很多,其实大部分都是异常处理的,主干代码只有两个:

1.connect()

/**
   * Connect to the zookeeper ensemble.
   * This is an optional method that may be used to force a connect before any other requests are sent.
   *
   */
  public void connect() {
    if (zkStateReader == null) {
      synchronized (this) {
        if (zkStateReader == null) {
          ZkStateReader zk = null;
          try {
            zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
            zk.createClusterStateWatchersAndUpdate();
            zkStateReader = zk;
          } catch (InterruptedException e) {
            zk.close();
            Thread.currentThread().interrupt();
            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
          } catch (KeeperException e) {
            zk.close();
            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
          } catch (Exception e) {
            if (zk != null) zk.close();
            // do not wrap because clients may be relying on the underlying exception being thrown
            throw e;
          }
        }
      }
    }
  }

调用ZkStateReader的方法来监控:

 public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
      InterruptedException {
    // We need to fetch the current cluster state and the set of live nodes

    log.info("Updating cluster state from ZooKeeper... ");

    // Sanity check ZK structure.
    if (!zkClient.exists(CLUSTER_STATE, true)) {
      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
              "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
    }

    // on reconnect of SolrZkClient force refresh and re-add watches.
    refreshLegacyClusterState(new LegacyClusterStateWatcher());
    refreshStateFormat2Collections();
    refreshCollectionList(new CollectionsChildWatcher());
    refreshLiveNodes(new LiveNodeWatcher());

    synchronized (ZkStateReader.this.getUpdateLock()) {
      constructState();

      zkClient.exists(ALIASES,
          new Watcher() {
            
            @Override
            public void process(WatchedEvent event) {
              // session events are not change events,
              // and do not remove the watcher
              if (EventType.None.equals(event.getType())) {
                return;
              }
              try {
                synchronized (ZkStateReader.this.getUpdateLock()) {
                  log.info("Updating aliases... ");

                  // remake watch
                  final Watcher thisWatch = this;
                  Stat stat = new Stat();
                  byte[] data = zkClient.getData(ALIASES, thisWatch, stat ,
                      true);

                  Aliases aliases = ClusterState.load(data);

                  ZkStateReader.this.aliases = aliases;
                }
              } catch (KeeperException e) {
                if (e.code() == KeeperException.Code.SESSIONEXPIRED
                    || e.code() == KeeperException.Code.CONNECTIONLOSS) {
                  log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
                  return;
                }
                log.error("", e);
                throw new ZooKeeperException(
                    SolrException.ErrorCode.SERVER_ERROR, "", e);
              } catch (InterruptedException e) {
                // Restore the interrupted status
                Thread.currentThread().interrupt();
                log.warn("", e);
                return;
              }
            }
            
          }, true);
    }
    updateAliases();

    if (securityNodeListener != null) {
      addSecuritynodeWatcher(SOLR_SECURITY_CONF_PATH, new Callable<Pair<byte[], Stat>>() {
        @Override
        public void call(Pair<byte[], Stat> pair) {
          ConfigData cd = new ConfigData();
          cd.data = pair.getKey() == null || pair.getKey().length == 0 ? EMPTY_MAP : Utils.getDeepCopy((Map) fromJSON(pair.getKey()), 4, false);
          cd.version = pair.getValue() == null ? -1 : pair.getValue().getVersion();
          securityData = cd;
          securityNodeListener.run();
        }
      });
      securityData = getSecurityProps(true);
    }
  }

2. sendRequest()

方法如下:

protected NamedList<Object> sendRequest(SolrRequest request, String collection)
      throws SolrServerException, IOException {
    connect();
    
    ClusterState clusterState = zkStateReader.getClusterState();
    
    boolean sendToLeaders = false;
    List<String> replicas = null;
    
    if (request instanceof IsUpdateRequest) {
      if (request instanceof UpdateRequest) {
        NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection, clusterState);
        if (response != null) {
          return response;
        }
      }
      sendToLeaders = true;
      replicas = new ArrayList<>();
    }
    
    SolrParams reqParams = request.getParams();
    if (reqParams == null) {
      reqParams = new ModifiableSolrParams();
    }
    List<String> theUrlList = new ArrayList<>();
    if (ADMIN_PATHS.contains(request.getPath())) {
      Set<String> liveNodes = clusterState.getLiveNodes();
      for (String liveNode : liveNodes) {
        theUrlList.add(zkStateReader.getBaseUrlForNodeName(liveNode));
      }
    } else {
      
      if (collection == null) {
        throw new SolrServerException(
            "No collection param specified on request and no default collection has been set.");
      }
      
      Set<String> collectionNames = getCollectionNames(clusterState, collection);
      if (collectionNames.size() == 0) {
        throw new SolrException(ErrorCode.BAD_REQUEST,
            "Could not find collection: " + collection);
      }

      String shardKeys =  reqParams.get(ShardParams._ROUTE_);

      // TODO: not a big deal because of the caching, but we could avoid looking
      // at every shard
      // when getting leaders if we tweaked some things
      
      // Retrieve slices from the cloud state and, for each collection
      // specified,
      // add it to the Map of slices.
      Map<String,Slice> slices = new HashMap<>();
      for (String collectionName : collectionNames) {
        DocCollection col = getDocCollection(clusterState, collectionName, null);
        Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
      }
      Set<String> liveNodes = clusterState.getLiveNodes();

      List<String> leaderUrlList = null;
      List<String> urlList = null;
      List<String> replicasList = null;
      
      // build a map of unique nodes
      // TODO: allow filtering by group, role, etc
      Map<String,ZkNodeProps> nodes = new HashMap<>();
      List<String> urlList2 = new ArrayList<>();
      for (Slice slice : slices.values()) {
        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
          String node = coreNodeProps.getNodeName();
          if (!liveNodes.contains(coreNodeProps.getNodeName())
              || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) continue;
          if (nodes.put(node, nodeProps) == null) {
            if (!sendToLeaders || coreNodeProps.isLeader()) {
              String url;
              if (reqParams.get(UpdateParams.COLLECTION) == null) {
                url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), collection);
              } else {
                url = coreNodeProps.getCoreUrl();
              }
              urlList2.add(url);
            } else {
              String url;
              if (reqParams.get(UpdateParams.COLLECTION) == null) {
                url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), collection);
              } else {
                url = coreNodeProps.getCoreUrl();
              }
              replicas.add(url);
            }
          }
        }
      }
      
      if (sendToLeaders) {
        leaderUrlList = urlList2;
        replicasList = replicas;
      } else {
        urlList = urlList2;
      }
      
      if (sendToLeaders) {
        theUrlList = new ArrayList<>(leaderUrlList.size());
        theUrlList.addAll(leaderUrlList);
      } else {
        theUrlList = new ArrayList<>(urlList.size());
        theUrlList.addAll(urlList);
      }
      if(theUrlList.isEmpty()) {
        for (String s : collectionNames) {
          if(s!=null) collectionStateCache.remove(s);
        }
        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Could not find a healthy node to handle the request.");
      }

      Collections.shuffle(theUrlList, rand);
      if (sendToLeaders) {
        ArrayList<String> theReplicas = new ArrayList<>(
            replicasList.size());
        theReplicas.addAll(replicasList);
        Collections.shuffle(theReplicas, rand);
        theUrlList.addAll(theReplicas);
      }
      
    }

    LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList);
    LBHttpSolrClient.Rsp rsp = lbClient.request(req);
    return rsp.getResponse();
  }

上面代码的意思是根据选项来确定urlList,然后由LBHttpSolrClient.Req来构造请求,获取响应。注意:使用for循环来构造每个server的请求。

/**
   * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
   * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
   * time, or until a test request on that server succeeds.
   *
   * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
   * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
   * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
   *
   * If no live servers are found a SolrServerException is thrown.
   *
   * @param req contains both the request as well as the list of servers to query
   *
   * @return the result of the request
   *
   * @throws IOException If there is a low-level I/O error.
   */
  public Rsp request(Req req) throws SolrServerException, IOException {
    Rsp rsp = new Rsp();
    Exception ex = null;
    boolean isUpdate = req.request instanceof IsUpdateRequest;
    List<ServerWrapper> skipped = null;

    long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
    long timeOutTime = System.nanoTime() + timeAllowedNano;
    for (String serverStr : req.getServers()) {
      if(isTimeExceeded(timeAllowedNano, timeOutTime)) {
        break;
      }
      
      serverStr = normalize(serverStr);
      // if the server is currently a zombie, just skip to the next one
      ServerWrapper wrapper = zombieServers.get(serverStr);
      if (wrapper != null) {
        // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
        final int numDeadServersToTry = req.getNumDeadServersToTry();
        if (numDeadServersToTry > 0) {
          if (skipped == null) {
            skipped = new ArrayList<>(numDeadServersToTry);
            skipped.add(wrapper);
          }
          else if (skipped.size() < numDeadServersToTry) {
            skipped.add(wrapper);
          }
        }
        continue;
      }
      rsp.server = serverStr;
      try {
        MDC.put("LBHttpSolrClient.url", serverStr);
        HttpSolrClient client = makeSolrClient(serverStr);

        ex = doRequest(client, req, rsp, isUpdate, false, null);
        if (ex == null) {
          return rsp; // SUCCESS
        }
      } finally {
        MDC.remove("LBHttpSolrClient.url");
      }
    }

    // try the servers we previously skipped
    if (skipped != null) {
      for (ServerWrapper wrapper : skipped) {
        if(isTimeExceeded(timeAllowedNano, timeOutTime)) {
          break;
        }

        ex = doRequest(wrapper.client, req, rsp, isUpdate, true, wrapper.getKey());
        if (ex == null) {
          return rsp; // SUCCESS
        }
      }
    }


    if (ex == null) {
      throw new SolrServerException("No live SolrServers available to handle this request");
    } else {
      throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
    }

  }

其有效代码如红色标示,创建HttpSolrClient,并发送请求。

protected Exception doRequest(HttpSolrClient client, Req req, Rsp rsp, boolean isUpdate,
      boolean isZombie, String zombieKey) throws SolrServerException, IOException {
    Exception ex = null;
    try {
      rsp.rsp = client.request(req.getRequest(), (String) null);
      if (isZombie) {
        zombieServers.remove(zombieKey);
      }
    } catch (SolrException e) {
      // we retry on 404 or 403 or 503 or 500
      // unless it's an update - then we only retry on connect exception
      if (!isUpdate && RETRY_CODES.contains(e.code())) {
        ex = (!isZombie) ? addZombie(client, e) : e;
      } else {
        // Server is alive but the request was likely malformed or invalid
        if (isZombie) {
          zombieServers.remove(zombieKey);
        }
        throw e;
      }
    } catch (SocketException e) {
      if (!isUpdate || e instanceof ConnectException) {
        ex = (!isZombie) ? addZombie(client, e) : e;
      } else {
        throw e;
      }
    } catch (SocketTimeoutException e) {
      if (!isUpdate) {
        ex = (!isZombie) ? addZombie(client, e) : e;
      } else {
        throw e;
      }
    } catch (SolrServerException e) {
      Throwable rootCause = e.getRootCause();
      if (!isUpdate && rootCause instanceof IOException) {
        ex = (!isZombie) ? addZombie(client, e) : e;
      } else if (isUpdate && rootCause instanceof ConnectException) {
        ex = (!isZombie) ? addZombie(client, e) : e;
      } else {
        throw e;
      }
    } catch (Exception e) {
      throw new SolrServerException(e);
    }

    return ex;
  }

上述也只有一行有效代码,其余都是异常处理,调用HttpSolrClient的request方法:

  public NamedList<Object> request(final SolrRequest request, final ResponseParser processor, String collection)
      throws SolrServerException, IOException {
    HttpRequestBase method = createMethod(request, collection);
    setBasicAuthHeader(request, method);
    return executeMethod(method, processor);
  }

响应流程

  /**
   * Send this request to a {@link SolrClient} and return the response
   *
   * @param client the SolrClient to communicate with
   * @param collection the collection to execute the request against
   *
   * @return the response
   *
   * @throws SolrServerException if there is an error on the Solr server
   * @throws IOException if there is a communication error
   */
  public final T process(SolrClient client, String collection) throws SolrServerException, IOException {
    long startTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
    T res = createResponse(client);
    res.setResponse(client.request(this, collection));
    long endTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
    res.setElapsedTime(endTime - startTime);
    return res;
  }

先调用创建QueryResponse方法:

  @Override
  protected QueryResponse createResponse(SolrClient client) {
    return new QueryResponse(client);
  }

然后设置Response内容

/**
   * Send this request to a {@link SolrClient} and return the response
   *
   * @param client the SolrClient to communicate with
   * @param collection the collection to execute the request against
   *
   * @return the response
   *
   * @throws SolrServerException if there is an error on the Solr server
   * @throws IOException if there is a communication error
   */
  public final T process(SolrClient client, String collection) throws SolrServerException, IOException {
    long startTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
    T res = createResponse(client);
    res.setResponse(client.request(this, collection));
    long endTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
    res.setElapsedTime(endTime - startTime);
    return res;
  }

 小结:

 SolrCloud检索索引过程:

  1、用户的一个查询,可以发送到含有该Collection的任意Solr的Server,Solr内部处理的逻辑会转到一个Replica。

  2、此Replica会基于查询索引的方式,启动分布式查询,基于索引的Shard的个数,把查询转为多个子查询,并把每个子查询定位到对应Shard的任意一个Replica。

  3、每个子查询返回查询结果。

  4、最初的Replica合并子查询,并把最终结果返回给用户。

参考文献

【1】https://support.lucidworks.com/hc/en-us/articles/206248037-Solr-JAVA-CloudSolrClient-Getting-Started

【2】http://josh-persistence.iteye.com/blog/2234411

原文地址:https://www.cnblogs.com/davidwang456/p/4971298.html