Copycat

client.submit(new PutCommand("foo", "Hello world!"));

ServerContext

connection.handler(CommandRequest.class, request -> state.command(request));

State.command

ReserveState开始,会把command forward到leader,只有leader可以处理command

@Override
  public CompletableFuture<CommandResponse> command(CommandRequest request) {
    context.checkThread();
    logRequest(request);

    if (context.getLeader() == null) {
      return CompletableFuture.completedFuture(logResponse(CommandResponse.builder()
        .withStatus(Response.Status.ERROR)
        .withError(CopycatError.Type.NO_LEADER_ERROR)
        .build()));
    } else {
      return this.<CommandRequest, CommandResponse>forward(request)
        .exceptionally(error -> CommandResponse.builder()
          .withStatus(Response.Status.ERROR)
          .withError(CopycatError.Type.NO_LEADER_ERROR)
          .build())
        .thenApply(this::logResponse);
    }
  }

LeaderState.Command

public CompletableFuture<CommandResponse> command(final CommandRequest request) {
    context.checkThread();
    logRequest(request);

    // Get the client's server session. If the session doesn't exist, return an unknown session error.
    ServerSessionContext session = context.getStateMachine().executor().context().sessions().getSession(request.session());
    if (session == null) { //如果session不存在,无法处理该command
      return CompletableFuture.completedFuture(logResponse(CommandResponse.builder()
        .withStatus(Response.Status.ERROR)
        .withError(CopycatError.Type.UNKNOWN_SESSION_ERROR)
        .build()));
    }

    ComposableFuture<CommandResponse> future = new ComposableFuture<>();
    sequenceCommand(request, session, future);
    return future;
  }

sequenceCommand

/**
   * Sequences the given command to the log.
   */
  private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
    // If the command is LINEARIZABLE and the session's current sequence number is less then one prior to the request
    // sequence number, queue this request for handling later. We want to handle command requests in the order in which
    // they were sent by the client. Note that it's possible for the session sequence number to be greater than the request
    // sequence number. In that case, it's likely that the command was submitted more than once to the
    // cluster, and the command will be deduplicated once applied to the state machine.
    if (request.sequence() > session.nextRequestSequence()) { //session中的request需要按sequence执行,所以如果request的sequence num大于我们期望的,说明这个request需要等之前的request先执行
      // If the request sequence number is more than 1k requests above the last sequenced request, reject the request.
      // The client should resubmit a request that fails with a COMMAND_ERROR.
      if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) { //如果request的sequence大的太多,和当前sequence比,大100以上
        future.complete(CommandResponse.builder()
          .withStatus(Response.Status.ERROR)
          .withError(CopycatError.Type.COMMAND_ERROR) //拒绝该command
          .build());
      }
      // Register the request in the request queue if it's not too far ahead of the current sequence number.
      else {
        session.registerRequest(request.sequence(), () -> applyCommand(request, session, future)); //放入queue等待
      }
    } else {
      applyCommand(request, session, future); //apply该command
    }
  }

如果command的request比期望的大,

session.registerRequest

ServerSessionContext

  ServerSessionContext registerRequest(long sequence, Runnable runnable) {
    commands.put(sequence, runnable);
    return this;
  }

可以看到会把sequence id和对于的function注册到commands里面,这里就是applyCommand

问题这个commands会在什么时候被触发执行,

ServerSessionContext setRequestSequence(long request) {
    if (request > this.requestSequence) {
      this.requestSequence = request;

      // When the request sequence number is incremented, get the next queued request callback and call it.
      // This will allow the command request to be evaluated in sequence.
      Runnable command = this.commands.remove(nextRequestSequence());
      if (command != null) {
        command.run();
      }
    }
    return this;
  }

在setRequestSequence的时候,

当set的时候,去commands里面看下,是否有下一个request在等待,如果有直接执行掉

applyCommand

/**
   * Applies the given command to the log.
   */
  private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
    final Command command = request.command();

    final long term = context.getTerm();
    final long timestamp = System.currentTimeMillis();
    final long index;

    // Create a CommandEntry and append it to the log.
    try (CommandEntry entry = context.getLog().create(CommandEntry.class)) {
      entry.setTerm(term)
        .setSession(request.session())
        .setTimestamp(timestamp)
        .setSequence(request.sequence())
        .setCommand(command);
      index = context.getLog().append(entry); //把CommandEntry写入log
      LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry);
    }

    // Replicate the command to followers.
    appendCommand(index, future);

    // Set the last processed request for the session. This will cause sequential command callbacks to be executed.
    session.setRequestSequence(request.sequence()); //更新session的sequence,这里会试图去check session.commands是否有next request
  }

appendCommand

/**
   * Sends append requests for a command to followers.
   */
  private void appendCommand(long index, CompletableFuture<CommandResponse> future) {
    appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries到该index
      context.checkThread();
      if (isOpen()) {
        if (commitError == null) {
          applyCommand(index, future); //如果成功,applyCommand
        } else {
          future.complete(logResponse(CommandResponse.builder()
            .withStatus(Response.Status.ERROR)
            .withError(CopycatError.Type.INTERNAL_ERROR)
            .build()));
        }
      }
    });
  }

applyCommand,函数名不能换换吗

  /**
   * Applies a command to the state machine.
   */
  private void applyCommand(long index, CompletableFuture<CommandResponse> future) {
    context.getStateMachine().<ServerStateMachine.Result>apply(index).whenComplete((result, error) -> {
      if (isOpen()) {
        completeOperation(result, CommandResponse.builder(), error, future);
      }
    });
  }

apply,我收到command首先要把它写到log里面,然后同步给follower,最终,需要去执行command,比如修改状态机里面的值,a=1

ServerContext.getStateMachine(),返回

private ServerStateMachine stateMachine;
 
这里调用ServerStateMachine.apply(index)
调用apply(entry)
调用apply((CommandEntry) entry)
private CompletableFuture<Result> apply(CommandEntry entry) {
    final CompletableFuture<Result> future = new CompletableFuture<>();
    final ThreadContext context = ThreadContext.currentContextOrThrow(); //这里保留当前thread的引用

    // First check to ensure that the session exists.
    ServerSessionContext session = executor.context().sessions().getSession(entry.getSession());

    // If the session is null, return an UnknownSessionException. Commands applied to the state machine must
    // have a session. We ensure that session register/unregister entries are not compacted from the log
    // until all associated commands have been cleaned.
    if (session == null) { //session不存在
      log.release(entry.getIndex());
      return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession()));
    }
    // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the
    // session registry until all prior commands have been released by the state machine, but new commands can
    // only be applied for sessions in an active state.
    else if (!session.state().active()) { //session的状态非active
      log.release(entry.getIndex());
      return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession()));
    }
    // If the command's sequence number is less than the next session sequence number then that indicates that
    // we've received a command that was previously applied to the state machine. Ensure linearizability by
    // returning the cached response instead of applying it to the user defined state machine.
    else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已经apply过的entry
      // Ensure the response check is executed in the state machine thread in order to ensure the
      // command was applied, otherwise there will be a race condition and concurrent modification issues.
      long sequence = entry.getSequence();

      // Switch to the state machine thread and get the existing response.
      executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的结果
      return future;
    }
    // If we've made it this far, the command must have been applied in the proper order as sequenced by the
    // session. This should be the case for most commands applied to the state machine.
    else {
      // Allow the executor to execute any scheduled events.
      long index = entry.getIndex();
      long sequence = entry.getSequence();

      // Calculate the updated timestamp for the command.
      long timestamp = executor.timestamp(entry.getTimestamp());

      // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed
      // in the state machine thread. Register the result in that thread and then complete the future in the caller's thread.
      ServerCommit commit = commits.acquire(entry, session, timestamp); //这里有个ServerCommitPool的实现,为了避免反复生成ServerCommit对象,直接从pool里面拿一个,用完放回去
      executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

      // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced
      // at this index receive the index of the command.
      setLastApplied(index);

      // Update the session timestamp and command sequence number. This is done in the caller's thread since all
      // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.
      session.setTimestamp(timestamp).setCommandSequence(sequence);
      return future;
    }
  }
executeCommand
ServerCommit commit = commits.acquire(entry, session, timestamp);
executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

注意这里有两个线程,

一个是context,是

ThreadContext threadContext

用来响应server请求的

还有一个是executor里面的stateContext,用来改变stateMachine的状态的

所以这里是用executor来执行executeCommand,但把ThreadContext传入

/**
   * Executes a state machine command.
   */
  private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuture<Result> future, ThreadContext context) {

    // Trigger scheduled callbacks in the state machine.
    executor.tick(index, timestamp);

    // Update the state machine context with the commit index and local server context. The synchronous flag
    // indicates whether the server expects linearizable completion of published events. Events will be published
    // based on the configured consistency level for the context.
    executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND);

    // Store the event index to return in the command response.
    long eventIndex = session.getEventIndex();

    try {
      // Execute the state machine operation and get the result.
      Object output = executor.executeOperation(commit);

      // Once the operation has been applied to the state machine, commit events published by the command.
      // The state machine context will build a composite future for events published to all sessions.
      executor.commit();

      // Store the result for linearizability and complete the command.
      Result result = new Result(index, eventIndex, output);
      session.registerResult(sequence, result); // 缓存执行结果
      context.executor().execute(() -> future.complete(result)); // complete future,表示future执行结束
    } catch (Exception e) {
      // If an exception occurs during execution of the command, store the exception.
      Result result = new Result(index, eventIndex, e);
      session.registerResult(sequence, result);
      context.executor().execute(() -> future.complete(result));
    }
  }
ServerStateMachineExecutor.tick
根据时间,去触发scheduledTasks中已经到时间的task
 
ServerStateMachineExecutor.init
更新state machine的context
void init(long index, Instant instant, ServerStateMachineContext.Type type) {
    context.update(index, instant, type);
  }
  
  //ServerStateMachineContext
  void update(long index, Instant instant, Type type) {
    this.index = index;
    this.type = type;
    clock.set(instant);
  }
 
ServerStateMachineExecutor.executeOperation
<T extends Operation<U>, U> U executeOperation(Commit commit) {

    // Get the function registered for the operation. If no function is registered, attempt to
    // use a global function if available.
    Function function = operations.get(commit.type()); //从operations找到type对应的function

    if (function == null) {
      // If no operation function was found for the class, try to find an operation function
      // registered with a parent class.
      for (Map.Entry<Class, Function> entry : operations.entrySet()) {
        if (entry.getKey().isAssignableFrom(commit.type())) { //如果注册的type是commit.type的父类
          function = entry.getValue();
          break;
        }
      }

      // If a parent operation function was found, store the function for future reference.
      if (function != null) {
        operations.put(commit.type(), function);
      }
    }

    if (function == null) {
      throw new IllegalStateException("unknown state machine operation: " + commit.type());
    } else {
      // Execute the operation. If the operation return value is a Future, await the result,
      // otherwise immediately complete the execution future.
      try {
        return (U) function.apply(commit); //真正执行function
      } catch (Exception e) {
        throw new ApplicationException(e, "An application error occurred");
      }
    }
  }
 
 
RequestSequence 和 CommandSequence有什么不同的,看看都在什么地方用了?
 

RequestSequence

Set

ServerStateMachine

private CompletableFuture<Void> apply(KeepAliveEntry entry) {
//…
  // Update the session keep alive index for log cleaning.
session.setKeepAliveIndex(entry.getIndex()).setRequestSequence(commandSequence);
}

LeaderState

private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
//……
  // Set the last processed request for the session. This will cause sequential command callbacks to be executed.
session.setRequestSequence(request.sequence());
}
 

Get

ServerSessionContext.setCommandSequence
// If the request sequence number is less than the applied sequence number, update the request
    // sequence number. This is necessary to ensure that if the local server is a follower that is
    // later elected leader, its sequences are consistent for commands.
    if (sequence > requestSequence) {
      // Only attempt to trigger command callbacks if any are registered.
      if (!this.commands.isEmpty()) {
        // For each request sequence number, a command callback completing the command submission may exist.
        for (long i = this.requestSequence + 1; i <= sequence; i++) {
          this.requestSequence = i;
          Runnable command = this.commands.remove(i);
          if (command != null) {
            command.run();
          }
        }
      } else {
        this.requestSequence = sequence;
      }
    }

LeaderState

/**
   * Sequences the given command to the log.
   */
  private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
    // If the command is LINEARIZABLE and the session's current sequence number is less then one prior to the request
    // sequence number, queue this request for handling later. We want to handle command requests in the order in which
    // they were sent by the client. Note that it's possible for the session sequence number to be greater than the request
    // sequence number. In that case, it's likely that the command was submitted more than once to the
    // cluster, and the command will be deduplicated once applied to the state machine.
    if (request.sequence() > session.nextRequestSequence()) {
      // If the request sequence number is more than 1k requests above the last sequenced request, reject the request.
      // The client should resubmit a request that fails with a COMMAND_ERROR.
      if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) {

CommandSequence

Set

ServerSessionContext.setCommandSequence

for (long i = commandSequence + 1; i <= sequence; i++) {
      commandSequence = i;
      List<Runnable> queries = this.sequenceQueries.remove(commandSequence);
      if (queries != null) {
        for (Runnable query : queries) {
          query.run();
        }
        queries.clear();
        queriesPool.add(queries);
      }
    }

ServerStateMachine

private CompletableFuture<Result> apply(CommandEntry entry)
// Update the session timestamp and command sequence number. This is done in the caller's thread since all
// timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.
session.setTimestamp(timestamp).setCommandSequence(sequence);

Get

LeaderState

sequenceLinearizableQuery, sequenceBoundedLinearizableQuery
/**
   * Sequences a bounded linearizable query.
   */
  private void sequenceBoundedLinearizableQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) {
    // If the query's sequence number is greater than the session's current sequence number, queue the request for
    // handling once the state machine is caught up.
    if (entry.getSequence() > session.getCommandSequence()) {
      session.registerSequenceQuery(entry.getSequence(), () -> applyQuery(entry, future));
    } else {
      applyQuery(entry, future);
    }
  }

PassiveState

private void sequenceQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) {
    // If the query's sequence number is greater than the session's current sequence number, queue the request for
    // handling once the state machine is caught up.
    if (entry.getSequence() > session.getCommandSequence()) {
      session.registerSequenceQuery(entry.getSequence(), () -> indexQuery(entry, future));
    } else {
      indexQuery(entry, future);
    }
  }
原文地址:https://www.cnblogs.com/fxjwind/p/6485254.html