ShardingSphere~5

Transaction 

基本管理接口看着很熟悉,没啥特别的

具体实现有两类,基于XA的两阶段事务模型和基于SEATA-AT的柔性事务模型

XA和SEATA-AT经典流程图

官网图比较清晰

XA事务管理模型XAShardingTransactionManager

负责对多数据源进行管理和适配,并且将相应事务的开启、提交和回滚操作委托给具体的 XA 事务管理器

public final class XAShardingTransactionManager implements ShardingTransactionManager {
    
    private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();
    
    private XATransactionManager xaTransactionManager;

    @Override
    public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources, final String transactionMangerType) {
        xaTransactionManager = XATransactionManagerLoader.getInstance().getXATransactionManager(transactionMangerType);
        xaTransactionManager.init();
        resourceDataSources.forEach(each -> cachedDataSources.put(each.getOriginalName(), newXATransactionDataSource(databaseType, each)));
    }

    private XATransactionDataSource newXATransactionDataSource(final DatabaseType databaseType, final ResourceDataSource resourceDataSource) {
        String resourceName = resourceDataSource.getUniqueResourceName();
        DataSource dataSource = resourceDataSource.getDataSource();
        return new XATransactionDataSource(databaseType, resourceName, dataSource, xaTransactionManager);
    }
    
    @Override
    public TransactionType getTransactionType() {
        return TransactionType.XA;
    }
    
    @SneakyThrows(SystemException.class)
    @Override
    public boolean isInTransaction() {
        return Status.STATUS_NO_TRANSACTION != xaTransactionManager.getTransactionManager().getStatus();
    }
    
    @Override
    public Connection getConnection(final String dataSourceName) throws SQLException {
        try {
            return cachedDataSources.get(dataSourceName).getConnection();
        } catch (final SystemException | RollbackException ex) {
            throw new SQLException(ex);
        }
    }
    
    @SneakyThrows({SystemException.class, NotSupportedException.class})
    @Override
    public void begin() {
        xaTransactionManager.getTransactionManager().begin();
    }
    
    @SneakyThrows({SystemException.class, RollbackException.class, HeuristicMixedException.class, HeuristicRollbackException.class})
    @Override
    public void commit() {
        xaTransactionManager.getTransactionManager().commit();
    }
    
    @SneakyThrows(SystemException.class)
    @Override
    public void rollback() {
        xaTransactionManager.getTransactionManager().rollback();
    }
    
    @Override
    public void close() throws Exception {
        for (XATransactionDataSource each : cachedDataSources.values()) {
            each.close();
        }
        cachedDataSources.clear();
        xaTransactionManager.close();
    }
}
View Code

管理的数据源也是需要支持XA协议的,看一段XATransactionDataSource中的片段

public Connection getConnection() throws SQLException, SystemException, RollbackException {
        if (CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {
            return dataSource.getConnection();
        }
        Connection result = dataSource.getConnection();
        XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result);
        Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction();
        if (!enlistedTransactions.get().contains(transaction)) {
            transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));
            transaction.registerSynchronization(new Synchronization() {
                @Override
                public void beforeCompletion() {
                    enlistedTransactions.get().remove(transaction);
                }
    
                @Override
                public void afterCompletion(final int status) {
                    enlistedTransactions.get().clear();
                }
            });
            enlistedTransactions.get().add(transaction);
        }
        return result;
    }

XA连接需要支持XA的底层数据库,以MYSQL为例看下数据库的支持

@RequiredArgsConstructor
public final class MySQLXAConnectionWrapper implements XAConnectionWrapper {
    
    private static final String MYSQL_XA_DATASOURCE_5 = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource";
    
    private static final String MYSQL_XA_DATASOURCE_8 = "com.mysql.cj.jdbc.MysqlXADataSource";
    
    @SneakyThrows(ReflectiveOperationException.class)
    @Override
    public XAConnection wrap(final XADataSource xaDataSource, final Connection connection) {
        Connection physicalConnection = unwrapPhysicalConnection(xaDataSource.getClass().getName(), connection);
        Method method = xaDataSource.getClass().getDeclaredMethod("wrapConnection", Connection.class);
        method.setAccessible(true);
        return (XAConnection) method.invoke(xaDataSource, physicalConnection);
    }
    
    @SneakyThrows({SQLException.class, ClassNotFoundException.class})
    private Connection unwrapPhysicalConnection(final String xaDataSourceClassName, final Connection connection) {
        switch (xaDataSourceClassName) {
            case MYSQL_XA_DATASOURCE_5:
                return (Connection) connection.unwrap(Class.forName("com.mysql.jdbc.Connection"));
            case MYSQL_XA_DATASOURCE_8:
                return (Connection) connection.unwrap(Class.forName("com.mysql.cj.jdbc.JdbcConnection"));
            default:
                throw new UnsupportedOperationException(String.format("Cannot support xa datasource: `%s`", xaDataSourceClassName));
        }
    }
}

参考官网:

XAShardingTransactionManager将数据库连接所对应的 XAResource 注册到当前 XA 事务中之后,事务管理器会在此阶段发送 XAResource.start 命令至数据库。 数据库在收到 XAResource.end 命令之前的所有 SQL 操作,会被标记为 XA 事务。

XAShardingTransactionManager 在接收到接入端的提交命令后,会委托实际的 XA 事务管理进行提交动作, 事务管理器将收集到的当前线程中所有注册的 XAResource,并发送 XAResource.end 指令,用以标记此 XA 事务边界。 接着会依次发送 prepare 指令,收集所有参与 XAResource 投票。 若所有 XAResource 的反馈结果均为正确,则调用 commit 指令进行最终提交; 若有任意 XAResource 的反馈结果不正确,则调用 rollback 指令进行回滚。 在事务管理器发出提交指令后,任何 XAResource 产生的异常都会通过恢复日志进行重试,以保证提交阶段的操作原子性,和数据强一致性。

public final class SeataATShardingTransactionManager implements ShardingTransactionManager {
    
    private final Map<String, DataSource> dataSourceMap = new HashMap<>();
    
    private final String applicationId;
    
    private final String transactionServiceGroup;
    
    private final boolean enableSeataAT;
    
    public SeataATShardingTransactionManager() {
        FileConfiguration config = new FileConfiguration("seata.conf");
        enableSeataAT = config.getBoolean("sharding.transaction.seata.at.enable", true);
        applicationId = config.getConfig("client.application.id");
        transactionServiceGroup = config.getConfig("client.transaction.service.group", "default");
    }
    
    @Override
    public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources, final String transactionMangerType) {
        if (enableSeataAT) {
            initSeataRPCClient();
            resourceDataSources.forEach(each -> dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource())));
        }
    }
    
    private void initSeataRPCClient() {
        Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file.");
        TMClient.init(applicationId, transactionServiceGroup);
        RMClient.init(applicationId, transactionServiceGroup);
    }
    
    @Override
    public TransactionType getTransactionType() {
        return TransactionType.BASE;
    }
    
    @Override
    public boolean isInTransaction() {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        return null != RootContext.getXID();
    }
    
    @Override
    public Connection getConnection(final String dataSourceName) throws SQLException {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        return dataSourceMap.get(dataSourceName).getConnection();
    }
    
    @Override
    @SneakyThrows(TransactionException.class)
    public void begin() {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        GlobalTransaction globalTransaction = GlobalTransactionContext.getCurrentOrCreate();
        globalTransaction.begin();
        SeataTransactionHolder.set(globalTransaction);
    }
    
    @Override
    @SneakyThrows(TransactionException.class)
    public void commit() {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        try {
            SeataTransactionHolder.get().commit();
        } finally {
            SeataTransactionHolder.clear();
            RootContext.unbind();
        }
    }
    
    @Override
    @SneakyThrows(TransactionException.class)
    public void rollback() {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        try {
            SeataTransactionHolder.get().rollback();
        } finally {
            SeataTransactionHolder.clear();
            RootContext.unbind();
        }
    }
    
    @Override
    public void close() {
        dataSourceMap.clear();
        SeataTransactionHolder.clear();
        TmRpcClient.getInstance().destroy();
        RmRpcClient.getInstance().destroy();
    }
}

Seata有什么不同

只管来看,多了org.apache.shardingsphere.transaction.base.seata.at.TransactionalSQLExecutionHook,看源码是拦截SQL执行的

这一部分都比较依赖原有事务实现机制,并对其做封装,贴下代码

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.shardingsphere.transaction.base.seata.at;

import com.google.common.base.Preconditions;
import io.seata.config.FileConfiguration;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.core.rpc.netty.RmRpcClient;
import io.seata.core.rpc.netty.TmRpcClient;
import io.seata.rm.RMClient;
import io.seata.rm.datasource.DataSourceProxy;
import io.seata.tm.TMClient;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.GlobalTransactionContext;
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.transaction.core.ResourceDataSource;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.spi.ShardingTransactionManager;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
 * Seata AT sharding transaction manager.
 */
public final class SeataATShardingTransactionManager implements ShardingTransactionManager {
    
    private final Map<String, DataSource> dataSourceMap = new HashMap<>();
    
    private final String applicationId;
    
    private final String transactionServiceGroup;
    
    private final boolean enableSeataAT;
    
    public SeataATShardingTransactionManager() {
        FileConfiguration config = new FileConfiguration("seata.conf");
        enableSeataAT = config.getBoolean("sharding.transaction.seata.at.enable", true);
        applicationId = config.getConfig("client.application.id");
        transactionServiceGroup = config.getConfig("client.transaction.service.group", "default");
    }
    
    @Override
    public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources, final String transactionMangerType) {
        if (enableSeataAT) {
            initSeataRPCClient();
            resourceDataSources.forEach(each -> dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource())));
        }
    }
    
    private void initSeataRPCClient() {
        Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file.");
        TMClient.init(applicationId, transactionServiceGroup);
        RMClient.init(applicationId, transactionServiceGroup);
    }
    
    @Override
    public TransactionType getTransactionType() {
        return TransactionType.BASE;
    }
    
    @Override
    public boolean isInTransaction() {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        return null != RootContext.getXID();
    }
    
    @Override
    public Connection getConnection(final String dataSourceName) throws SQLException {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        return dataSourceMap.get(dataSourceName).getConnection();
    }
    
    @Override
    @SneakyThrows(TransactionException.class)
    public void begin() {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        GlobalTransaction globalTransaction = GlobalTransactionContext.getCurrentOrCreate();
        globalTransaction.begin();
        SeataTransactionHolder.set(globalTransaction);
    }
    
    @Override
    @SneakyThrows(TransactionException.class)
    public void commit() {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        try {
            SeataTransactionHolder.get().commit();
        } finally {
            SeataTransactionHolder.clear();
            RootContext.unbind();
        }
    }
    
    @Override
    @SneakyThrows(TransactionException.class)
    public void rollback() {
        Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled.");
        try {
            SeataTransactionHolder.get().rollback();
        } finally {
            SeataTransactionHolder.clear();
            RootContext.unbind();
        }
    }
    
    @Override
    public void close() {
        dataSourceMap.clear();
        SeataTransactionHolder.clear();
        TmRpcClient.getInstance().destroy();
        RmRpcClient.getInstance().destroy();
    }
}
View Code
原文地址:https://www.cnblogs.com/it-worker365/p/14991735.html