MySQL Replication--参数binlog_transaction_dependency_tracking代码

WRITESET_SESSION特殊点

/*
  This object encapsulates the state kept between transactions of the same
  client in order to compute logical timestamps based on WRITESET_SESSION.
*/
class Dependency_tracker_ctx {
 public:
  Dependency_tracker_ctx() : m_last_session_sequence_number(0) {}

  void set_last_session_sequence_number(int64 sequence_number) {
    m_last_session_sequence_number = sequence_number;
  }

  int64 get_last_session_sequence_number() {
    return m_last_session_sequence_number;
  }

 private:
  int64 m_last_session_sequence_number;
};

计算依赖入口


/**
  Get the dependencies in a transaction, the main entry point for the
  dependency tracking work.
*/
void Transaction_dependency_tracker::get_dependency(THD *thd,
                                                    int64 &sequence_number,
                                                    int64 &commit_parent) {
  sequence_number = commit_parent = 0;

  switch (m_opt_tracking_mode) {
    case DEPENDENCY_TRACKING_COMMIT_ORDER:
      m_commit_order.get_dependency(thd, sequence_number, commit_parent);
      break;
    case DEPENDENCY_TRACKING_WRITESET:
      m_commit_order.get_dependency(thd, sequence_number, commit_parent);
      m_writeset.get_dependency(thd, sequence_number, commit_parent);
      break;
    case DEPENDENCY_TRACKING_WRITESET_SESSION:
      m_commit_order.get_dependency(thd, sequence_number, commit_parent);
      m_writeset.get_dependency(thd, sequence_number, commit_parent);
      m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
      break;
    default:
      DBUG_ASSERT(0);  // blow up on debug
      /*
        Fallback to commit order on production builds.
       */
      m_commit_order.get_dependency(thd, sequence_number, commit_parent);
  }
}

可以发现WRITESET和WRITESET_SESSION都调用到COMMIT_ORDER的get_dependency函数

COMMIT_ORDER计算依赖

/**
  Get the sequence_number for a transaction, and get the last_commit based
  on parallel committing transactions.

  @param[in]     thd             Current THD from which to extract trx context.
  @param[in,out] sequence_number Sequence number of current transaction.
  @param[in,out] commit_parent   Commit_parent of current transaction,
                                 pre-filled with the commit_parent calculated
                                 by the logical clock logic.
*/
void Commit_order_trx_dependency_tracker::get_dependency(THD *thd,
                                                         int64 &sequence_number,
                                                         int64 &commit_parent) {
  Transaction_ctx *trn_ctx = thd->get_transaction();

  DBUG_ASSERT(trn_ctx->sequence_number >
              m_max_committed_transaction.get_offset());
  /*
    Prepare sequence_number and commit_parent relative to the current
    binlog.  This is done by subtracting the binlog's clock offset
    from the values.

    A transaction that commits after the binlog is rotated, can have a
    commit parent in the previous binlog. In this case, subtracting
    the offset from the sequence number results in a negative
    number. The commit parent dependency gets lost in such
    case. Therefore, we log the value SEQ_UNINIT in this case.
  */
  sequence_number =
      trn_ctx->sequence_number - m_max_committed_transaction.get_offset();

  if (trn_ctx->last_committed <= m_max_committed_transaction.get_offset())
    commit_parent = SEQ_UNINIT;
  else
    commit_parent =
        std::max(trn_ctx->last_committed, m_last_blocking_transaction) -
        m_max_committed_transaction.get_offset();

  if (is_trx_unsafe_for_parallel_slave(thd))
    m_last_blocking_transaction = trn_ctx->sequence_number;
}

WRITESET计算依赖

/**
  Get the writeset dependencies of a transaction.
  This takes the commit_parent that must be previously set using
  Commit_order_trx_dependency_tracker and tries to make the commit_parent as
  low as possible, using the writesets of each transaction.
  The commit_parent returned depends on how many row hashes are stored in the
  writeset_history, which is cleared once it reaches the user-defined maximum.

  @param[in]     thd             Current THD from which to extract trx context.
  @param[in,out] sequence_number Sequence number of current transaction.
  @param[in,out] commit_parent   Commit_parent of current transaction,
                                 pre-filled with the commit_parent calculated by
                                 Commit_order_trx_dependency_tracker to use when
                                 the writeset commit_parent is not valid.
*/
void Writeset_trx_dependency_tracker::get_dependency(THD *thd,
                                                     int64 &sequence_number,
                                                     int64 &commit_parent) {
  Rpl_transaction_write_set_ctx *write_set_ctx =
      thd->get_transaction()->get_transaction_write_set_ctx();
  std::vector<uint64> *writeset = write_set_ctx->get_write_set();

#ifndef DBUG_OFF
  /* The writeset of an empty transaction must be empty. */
  if (is_empty_transaction_in_binlog_cache(thd))
    DBUG_ASSERT(writeset->size() == 0);
#endif

  /*
    Check if this transaction has a writeset, if the writeset will overflow the
    history size, if the transaction_write_set_extraction is consistent
    between session and global or if changes in the tables referenced in this
    transaction cascade to other tables. If that happens revert to using the
    COMMIT_ORDER and clear the history to keep data consistent.
  */
  bool can_use_writesets =
      // empty writeset implies DDL or similar, except if there are missing keys
      (writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
       /*
         The empty transactions do not need to clear the writeset history, since
         they can be executed in parallel.
       */
       is_empty_transaction_in_binlog_cache(thd)) &&
      // hashing algorithm for the session must be the same as used by other
      // rows in history
      (global_system_variables.transaction_write_set_extraction ==
       thd->variables.transaction_write_set_extraction) &&
      // must not use foreign keys
      !write_set_ctx->get_has_related_foreign_keys() &&
      // it did not broke past the capacity already
      !write_set_ctx->was_write_set_limit_reached();
  bool exceeds_capacity = false;

  if (can_use_writesets) {
    /*
     Check if adding this transaction exceeds the capacity of the writeset
     history. If that happens, m_writeset_history will be cleared only after
     using its information for current transaction.
    */
    exceeds_capacity =
        m_writeset_history.size() + writeset->size() > m_opt_max_history_size;

    /*
     Compute the greatest sequence_number among all conflicts and add the
     transaction's row hashes to the history.
    */
    int64 last_parent = m_writeset_history_start;
    for (std::vector<uint64>::iterator it = writeset->begin();
         it != writeset->end(); ++it) {
      Writeset_history::iterator hst = m_writeset_history.find(*it);
      if (hst != m_writeset_history.end()) {
        if (hst->second > last_parent && hst->second < sequence_number)
          last_parent = hst->second;

        hst->second = sequence_number;
      } else {
        if (!exceeds_capacity)
          m_writeset_history.insert(
              std::pair<uint64, int64>(*it, sequence_number));
      }
    }

    /*
      If the transaction references tables with missing primary keys revert to
      COMMIT_ORDER, update and not reset history, as it is unnecessary because
      any transaction that refers this table will also revert to COMMIT_ORDER.
    */
    if (!write_set_ctx->get_has_missing_keys()) {
      /*
       The WRITESET commit_parent then becomes the minimum of largest parent
       found using the hashes of the row touched by the transaction and the
       commit parent calculated with COMMIT_ORDER.
      */
      commit_parent = std::min(last_parent, commit_parent);
    }
  }

  if (exceeds_capacity || !can_use_writesets) {
    m_writeset_history_start = sequence_number;
    m_writeset_history.clear();
  }
}

WRITESET_SESSION计算依赖

/**
  Get the writeset commit parent of transactions using the session dependencies.

  @param[in]     thd             Current THD from which to extract trx context.
  @param[in,out] sequence_number Sequence number of current transaction.
  @param[in,out] commit_parent   Commit_parent of current transaction,
                                 pre-filled with the commit_parent calculated
                                 by the Write_set_trx_dependency_tracker as a
                                 fall-back.
*/
void Writeset_session_trx_dependency_tracker::get_dependency(
    THD *thd, int64 &sequence_number, int64 &commit_parent) {
  int64 session_parent = thd->rpl_thd_ctx.dependency_tracker_ctx()
                             .get_last_session_sequence_number();

  if (session_parent != 0 && session_parent < sequence_number)
    commit_parent = std::max(commit_parent, session_parent);

  thd->rpl_thd_ctx.dependency_tracker_ctx().set_last_session_sequence_number(
      sequence_number);
}


原文地址:https://www.cnblogs.com/gaogao67/p/14643583.html