lucene源码分析(8)MergeScheduler

1.使用IndexWriter.java

    mergeScheduler.merge(this, MergeTrigger.EXPLICIT, newMergesFound);

2.定义MergeScheduler

/** <p>Expert: {@link IndexWriter} uses an instance
 *  implementing this interface to execute the merges
 *  selected by a {@link MergePolicy}.  The default
 *  MergeScheduler is {@link ConcurrentMergeScheduler}.</p>
 * @lucene.experimental
*/

3.MergeTrigger 出发merge的事件

/**
 * MergeTrigger is passed to
 * {@link MergePolicy#findMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)} to indicate the
 * event that triggered the merge.
 */
public enum MergeTrigger {
  /**
   * Merge was triggered by a segment flush.
   */
  SEGMENT_FLUSH,

  /**
   * Merge was triggered by a full flush. Full flushes
   * can be caused by a commit, NRT reader reopen or a close call on the index writer.
   */
  FULL_FLUSH,

  /**
   * Merge has been triggered explicitly by the user.
   */
  EXPLICIT,

  /**
   * Merge was triggered by a successfully finished merge.
   */
  MERGE_FINISHED,

  /**
   * Merge was triggered by a closing IndexWriter.
   */
  CLOSING
}

4.ConcurrentMergeScheduler默认实现

/** A {@link MergeScheduler} that runs each merge using a
 *  separate thread.
 *
 *  <p>Specify the max number of threads that may run at
 *  once, and the maximum number of simultaneous merges
 *  with {@link #setMaxMergesAndThreads}.</p>
 *
 *  <p>If the number of merges exceeds the max number of threads 
 *  then the largest merges are paused until one of the smaller
 *  merges completes.</p>
 *
 *  <p>If more than {@link #getMaxMergeCount} merges are
 *  requested then this class will forcefully throttle the
 *  incoming threads by pausing until one more more merges
 *  complete.</p>
 *
 *  <p>This class attempts to detect whether the index is
 *  on rotational storage (traditional hard drive) or not
 *  (e.g. solid-state disk) and changes the default max merge
 *  and thread count accordingly.  This detection is currently
 *  Linux-only, and relies on the OS to put the right value
 *  into /sys/block/&lt;dev&gt;/block/rotational.  For all
 *  other operating systems it currently assumes a rotational
 *  disk for backwards compatibility.  To enable default
 *  settings for spinning or solid state disks for such
 *  operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}.
 */ 

5.MergeThread执行merge任务

    @Override
    public void run() {
      try {
        if (verbose()) {
          message("  merge thread: start");
        }

        doMerge(writer, merge);

        if (verbose()) {
          message("  merge thread: done");
        }

        // Let CMS run new merges if necessary:
        try {
          merge(writer, MergeTrigger.MERGE_FINISHED, true);
        } catch (AlreadyClosedException ace) {
          // OK
        } catch (IOException ioe) {
          throw new RuntimeException(ioe);
        }

      } catch (Throwable exc) {

        if (exc instanceof MergePolicy.MergeAbortedException) {
          // OK to ignore
        } else if (suppressExceptions == false) {
          // suppressExceptions is normally only set during
          // testing.
          handleMergeException(writer.getDirectory(), exc);
        }

      } finally {
        synchronized(ConcurrentMergeScheduler.this) {
          removeMergeThread();

          updateMergeThreads();

          // In case we had stalled indexing, we can now wake up
          // and possibly unstall:
          ConcurrentMergeScheduler.this.notifyAll();
        }
      }
    }
  }

merge过程

 /**
   * Merges the indicated segments, replacing them in the stack with a
   * single segment.
   * 
   * @lucene.experimental
   */
  public void merge(MergePolicy.OneMerge merge) throws IOException {

    boolean success = false;

    final long t0 = System.currentTimeMillis();

    final MergePolicy mergePolicy = config.getMergePolicy();
    try {
      try {
        try {
          mergeInit(merge);

          if (infoStream.isEnabled("IW")) {
            infoStream.message("IW", "now merge
  merge=" + segString(merge.segments) + "
  index=" + segString());
          }

          mergeMiddle(merge, mergePolicy);
          mergeSuccess(merge);
          success = true;
        } catch (Throwable t) {
          handleMergeException(t, merge);
        }
      } finally {
        synchronized(this) {

          mergeFinish(merge);

          if (success == false) {
            if (infoStream.isEnabled("IW")) {
              infoStream.message("IW", "hit exception during merge");
            }
          } else if (!merge.isAborted() && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {
            // This merge (and, generally, any change to the
            // segments) may now enable new merges, so we call
            // merge policy & update pending merges.
            updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);
          }
        }
      }
    } catch (Throwable t) {
      // Important that tragicEvent is called after mergeFinish, else we hang
      // waiting for our merge thread to be removed from runningMerges:
      tragicEvent(t, "merge");
      throw t;
    }

    if (merge.info != null && merge.isAborted() == false) {
      if (infoStream.isEnabled("IW")) {
        infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs");
      }
    }
  }

6.merge结束

  /** Does fininishing for a merge, which is fast but holds
   *  the synchronized lock on IndexWriter instance. */
  final synchronized void mergeFinish(MergePolicy.OneMerge merge) {

    // forceMerge, addIndexes or waitForMerges may be waiting
    // on merges to finish.
    notifyAll();

    // It's possible we are called twice, eg if there was an
    // exception inside mergeInit
    if (merge.registerDone) {
      final List<SegmentCommitInfo> sourceSegments = merge.segments;
      for (SegmentCommitInfo info : sourceSegments) {
        mergingSegments.remove(info);
      }
      merge.registerDone = false;
    }

    runningMerges.remove(merge);
  }
原文地址:https://www.cnblogs.com/davidwang456/p/10059134.html