Spring的事务实现原理

主流程

Spring的事务采用AOP的方式实现。

@Transactional 注解的属性信息

name                当在配置文件中有多个 TransactionManager , 可以用该属性指定选择哪个事务管理器

propagation      事务的传播行为,默认值为 REQUIRED。

isolation            事务的隔离度,默认值采用 DEFAULT。

timeout              事务的超时时间,默认值为-1。如果超过该时间限制但事务还没有完成,则自动回滚事务。

read-only           指定事务是否为只读事务,默认值为 false;为了忽略那些不需要事务的方法,比如读取数据,可以设置 read-only 为 true。

rollback-for         用于指定能够触发事务回滚的异常类型,如果有多个异常类型需要指定,各类型之间可以通过逗号分隔。

no-rollback- for    抛出 no-rollback-for 指定的异常类型,不回滚事务。

除此以外,@Transactional 注解也可以添加到类级别上。当把@Transactional 注解放在类级别时,表示所有该类的公共方法都配置相同的事务属性信息。见清单 2,EmployeeService 的所有方法都支持事务并且是只读。当类级别配置了@Transactional,方法级别也配置了@Transactional,应用程序会以方法级别的事务属性信息来管理事务,换言之,方法级别的事务属性信息会覆盖类级别的相关配置信息。

我们从TransactionAspectSupport这个类开始f分析。

  1. 获取事务的属性(@Transactional注解中的配置)
  2. 加载配置中的TransactionManager.
  3. 获取收集事务信息TransactionInfo
  4. 执行目标方法
  5. 出现异常,尝试处理。
  6. 清理事务相关信息
  7. 提交事务
//1. 获取@Transactional注解的相关参数
TransactionAttributeSource tas = getTransactionAttributeSource();
// 2. 获取事务管理器
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // 3. 获取TransactionInfo,包含了tm和TransactionStatus
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            // 4.执行目标方法
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
           //5.回滚
            // target invocation exception
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
          // 6. 清理当前线程的事务相关信息
            cleanupTransactionInfo(txInfo);
        }
        // 提交事务
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
View Code

        在执行createTransactionIfNecessary获取事务状态时,就准备好了开启事务的所有内容,主要是执行了JDBC的con.setAutoCommit(false)方法。同时处理了很多和数据库连接相关的ThreadLocal变量。

关键对象介绍
PlatformTransactionManager
TransactionManager是做什么的?它保存着当前的数据源连接,对外提供对该数据源的事务提交回滚操作接口,同时实现了事务相关操作的方法。一个数据源DataSource需要一个事务管理器。

属性:DataSource

内部核心方法:
public commit 提交事务
public rollback 回滚事务
public getTransaction 获得当前事务状态

protected doSuspend 挂起事务

protected doBegin 开始事务,主要是执行了JDBC的con.setAutoCommit(false)方法。同时处理了很多和数据库连接相关的ThreadLocal变量。

protected doCommit 提交事务
protected doRollback 回滚事务
protected doGetTransaction() 获取事务信息
final getTransaction 获取事务状态

获取对应的TransactionManager

@Nullable
    protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
        // Do not attempt to lookup tx manager if no tx attributes are set
        if (txAttr == null || this.beanFactory == null) {
            return getTransactionManager();
        }

        String qualifier = txAttr.getQualifier();
        //如果指定了Bean则取指定的PlatformTransactionManager类型的Bean
        if (StringUtils.hasText(qualifier)) {
            return determineQualifiedTransactionManager(this.beanFactory, qualifier);
        }
        //如果指定了Bean的名称,则根据bean名称获取对应的bean
        else if (StringUtils.hasText(this.transactionManagerBeanName)) {
            return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
        }
        else {
        // 默认取一个PlatformTransactionManager类型的Bean
            PlatformTransactionManager defaultTransactionManager = getTransactionManager();
            if (defaultTransactionManager == null) {
                defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
                if (defaultTransactionManager == null) {
                    defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
                    this.transactionManagerCache.putIfAbsent(
                            DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
                }
            }
            return defaultTransactionManager;
        }
    }
View Code

可以看到,如果没有指定TransactionManager参数的话,会使用默认的一个实现,所以当程序中有多个数据库时,事务执行最好是指定事务管理器。

1.核心属性:事务状态TransactionStatus
2.事务管理器
3.事务属性
4.上一个事务信息oldTransactionInfo,REQUIRE_NEW传播级别时,事务挂起后前一个事务的事务信息

当前事务状态TransactionStatus
通过TransactionManager的getTransaction方法,获取当前事务的状态。
具体是在AbstractPlatformTransactionManager中实现.
TransactionStatus被用来做什么:TransactionManager对事务进行提交或回滚时需要用到该对象
具体方法有:

作用:

判断当前事务是否是一个新的事务,否则加入到一个已经存在的事务中。事务传播级别REQUIRED和REQUIRE_NEW有用到。
当前事务是否携带保存点,嵌套事务用到。
setRollbackOnly,isRollbackOnly,当子事务回滚时,并不真正回滚事务,而是对子事务设置一个标志位。
事务是否已经完成,已经提交或者已经回滚。

传播级别
介绍
Spring事务的传播级别描述的是多个使用了@Transactional注解的方法互相调用时,Spring对事务的处理。包涵的传播级别有:(在TransactionDefinition接口中定义)

7种事务传播行为如下:默认  propagation = PROPAGATION_REQUIRED

1.PROPAGATION_REQUIRED

  如果当前没有事务,就创建一个新事务,如果当前存在事务,就加入该事务,这是最常见的选择,也是Spring默认的事务传播行为。

2.PROPAGATION_SUPPORTS

  支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就以非事务执行。

如果外围事务回滚,内部事务也要回滚。

3.PROPAGATION_MANDATORY

  支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就抛出异常。

4.PROPAGATION_REQUIRES_NEW

  创建新事务,无论当前存不存在事务,都创建新事务。

5.PROPAGATION_NOT_SUPPORTED

  以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。

6.PROPAGATION_NEVER

  以非事务方式执行,如果当前存在事务,则抛出异常。

7.PROPAGATION_NESTED

  如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则按REQUIRED属性执行。

原理:

我们从两个角度看他们的实现原理,一个是刚进入事务时,针对不同的传播级别,它们的行为有什么区别。另一个角度是当事务提交或者回滚时,传播级别对事务行为的影响。

首先在尝试获取事务信息时,如果当前已经存在一个事务,则会根据传播级别做一些处理:

隔离级别对开始事务的影响(获取TransactionStatus)

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
  // 从当前的transactionManager获取DataSource对象
  // 然后以该DataSource对象为Key,
  // 去一个ThreadLocal变量中的map中获取该DataSource的连接
  // 然后设置到DataSourceTransactionObject中返回。
    Object transaction = doGetTransaction();
    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();
    if (definition == null) {
        // Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }
// 如果当前线程已经在一个事务中了,则需要根据事务的传播级别
//来决定如何处理并获取事务状态对象
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }
    // Check definition settings for new transaction.
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
          //如果当前不在一个事务中,则执行事务的准备操作
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 构造事务状态对象,注意这里第三个参数为true,代表是一个新事务
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //执行begin操作,核心操作是设置隔离级别,执行   conn.setAutoCommit(false); 同时将数据连接绑定到当前线程
            doBegin(transaction, definition);
            // 针对事务相关属性如隔离级别,是否在事务中,设置绑定到当前线程
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}
View Code

可以看到,如果当前不存在事务时,创建一个新的TransactionStatus对象,否则进入到handleExistingTransaction。下面来看这个方法

1.如果是NEVER传播级别则抛出异常
2. 如果是不支持,则挂起事务,将当前事务对象设置为null,newTransaction设置为false,把线程的相关Threadlocal变量改的就像当前不存在事务一样
3. 如果是required_NEW的话,则挂起当前事务,同时创建一个新的事务,执行doBegin操作,设置newTransaction为true
4.如果是嵌入事务,则创建一个SAVEPOINT
5.对于REQUIRED传播级别会把newTransaction标志位设置为false

/**
 * Create a TransactionStatus for an existing transaction.
 */
private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {
    //如果是NEVER传播级别则抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    // 如果是不支持,则挂起事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        //挂起事务同时将当前事务设置为null,newTransaction设置为false,把线程的相关Threadlocal变量改的就像当前不存在事务一样
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    //如果是required_NEW的话,则挂起当前事务,同时创建一个新的事务,执行doBegin操作
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }
    // 如果是嵌入事务,则创建一个SAVEPOINT
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
//这里判断是否需要对已经存在的事务进行校验,这个可以通过AbstractPlatformTransactionManager.setValidateExistingTransaction(boolean)来设置,设置为true后需要校验当前事务的隔离级别和已经存在的事务的隔离级别是否一致 
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    // 如果不设置是否校验已经存在的事务,则对于REQUIRED传播级别会走到这里来,这里把newTransaction标志位设置为false,
    // 这里用的definition是当前事务的相关属性,所以隔离级别等依然是当前事务的(子事务),而不是已经存在的事务的隔离级别(父事务)
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
View Code

隔离级别对回滚事务的影响

  1. 如果newTransaction设置为ture,则真正执行回滚。
  2. 如果有保存点,则回滚到保存点。
  3. 否则不会真正回滚,而是设置回滚标志位。
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
            boolean unexpectedRollback = unexpected;
            try {
                triggerBeforeCompletion(status);
                // 如果有保存点
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    status.rollbackToHeldSavepoint();
                }
                // 如果是新的事务,当传播级别为RUQUIRED_NEW时会走到这里来
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    doRollback(status);
                }
                else {
                    // 加入到事务中,设置回滚状态,适用于REQUIRED传播级别
                    // 并不会真的回滚,而是设置回滚标志位
                    // Participating in larger transaction
                    if (status.hasTransaction()) {
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
                            doSetRollbackOnly(status);
                        }
                        else {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                            }
                        }
                    }
                    else {
                        logger.debug("Should roll back transaction but cannot - no transaction available");
                    }
                    // Unexpected rollback only matters here if we're asked to fail early
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            }
            catch (RuntimeException | Error ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }

            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

            // Raise UnexpectedRollbackException if we had a global rollback-only marker
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }
View Code

隔离级别对提交事务的影响
就算事务方法没有抛出异常,走到了commit方法中,但是依然有可能回滚事务。
对于REQUIRED传播级别,即使父事务中没有抛出异常,但是子事务中已经设置了回滚标志,那么父事务依然会回滚
只有newTransaction标志位为true的事务才会真正执行commit操作。

@Override
public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        processRollback(defStatus, false);
        return;
    }
 // 对于REQUIRED传播级别,即使父事务中没有抛出异常,但是子事务中已经设置了回滚标志,那么父事务依然会回滚
    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        processRollback(defStatus, true);
        return;
    }

    processCommit(defStatus);
}

为什么我的事务不生效
1.如果不是Innodb存储引擎,MyISAM不支持事务。
2.没有指定rollbackFor参数。
3. 没有指定transactionManager参数,默认的transactionManager并不是我期望的,以及一个事务中涉及到了多个数据库。
4. 如果AOP使用了JDK动态代理,对象内部方法互相调用不会被Spring的AOP拦截,@Transactional注解无效。
5. 如果AOP使用了CGLIB代理,事务方法或者类不是public,无法被外部包访问到,或者是final无法继承,@transactional注解无效。

原文链接:https://blog.csdn.net/weixin_44366439/article/details/89030080

参考: https://www.cnblogs.com/haha12/p/11855001.html

原文地址:https://www.cnblogs.com/yrjns/p/11945043.html