四、DataSourceProxy注册为Resource

所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

正文

前面两篇文章,主要了解了一下关于seata是怎么通过AOP给方法事务增强的,且关于TransactionalTemplate是如何执行事务的being -> commit || rollback的。本文将开始涉及关于数据源代理的部分。

在阅读seata自动配置相关的GlobalTransactionScanner这个类的时候提了一下数据源代理的事情,数据源代理是seata实现分布式事务非常重要的点。并且,数据源代理类DataSourceProxy和DataSource是一种组合代理关系。

了解数据源代理,我们先从DataSourceProxy这个类开始

DataSourceProxy数据源代理

我们先看一下它的UML类图

DataSourceProxy直接间接实现了DataSource,也就是可以直接被当作DataSource来使用。同时DataSourceProxy实现了Resource,将作为ResourceManager管理的资源对象。

AbstractDataSourceProxy

作为对DataSource接口直接实现的抽象类,我们看看它包含了哪些实现。

public abstract class AbstractDataSourceProxy implements DataSource {

    /**
     * 目标数据源
     */
    protected DataSource targetDataSource;

    /**
     * 构造方法传入目标数据源
     */
    public AbstractDataSourceProxy(DataSource targetDataSource) {
        this.targetDataSource = targetDataSource;
    }

    /**
     * 获取目标数据源
     */
    public DataSource getTargetDataSource() {
        return targetDataSource;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return targetDataSource.unwrap(iface);
    }

    // 省略部分 override方法
}

可以看到,AbstractDataSourceProxy定义了一个构造方法,要求传入原始的数据源。

override的方法将直接调用原始数据源的方法。由此可见,AbstractDataSourceProxy并未实现事务代理相关的内容,需要由之类来实现。

DataSourceProxy

DataSourceProxy直接继承自AbstractDataSourceProxy,我们直接打开它的构造方法看看。

public DataSourceProxy(DataSource targetDataSource) {
    this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
}

跟进this

public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
    super(targetDataSource);
    // 初始化
    init(targetDataSource, resourceGroupId);
}

在构造DataSourceProxy的过程中进行了初始化操作,跟进init方法

private void init(DataSource dataSource, String resourceGroupId) {
    this.resourceGroupId = resourceGroupId;
    // 获取原始dataSource的Connection
    try (Connection connection = dataSource.getConnection()) {
        // 从Connection中获取URL地址
        jdbcUrl = connection.getMetaData().getURL();
        // 从Connection中获取数据库类型
        dbType = JdbcUtils.getDbType(jdbcUrl, null);
    } catch (SQLException e) {
        throw new IllegalStateException("can not init dataSource", e);
    }
    // 注册当前对象到ResourceManager
    DefaultResourceManager.get().registerResource(this);
    if (ENABLE_TABLE_META_CHECKER_ENABLE) {
        // 定时任务校验表的元数据
        tableMetaExcutor.scheduleAtFixedRate(() -> {
            try (Connection connection = dataSource.getConnection()) {
                TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId());
            } catch (Exception e) {
            }
        }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
    }
}

在init方法中主要是做了一件事情,将当前对象注册到ResourceManager中。前面我们说过,DataSourceProxy这个对象实现了Resource,作为被ResourceManager直接管控的资源来使用。

这里作为Resource被注册到ResourceManager中,我们跟进registerResource方法看看注册过程

@Override
public void registerResource(Resource resource) {
    // 根据branchType选择了ResourceManager,调用其注册方法
    getResourceManager(resource.getBranchType()).registerResource(resource);
}

每个Resource都属于一种branchType,branch叫做分支事务,属于全局事务当中的一个节点。

DataSourceProxy的branchType是什么类型呢?打开getBranchType方法看看

@Override
public BranchType getBranchType() {
    return BranchType.AT;
}

是AT类型,也就是自动事务。那么getResourceManager将会获取到什么ResourceManager?

protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();

private DefaultResourceManager() {
    initResourceManagers();
}

protected void initResourceManagers() {
    // SPI机制加载所有的ResourceManager
    List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
    if (CollectionUtils.isNotEmpty(allResourceManagers)) {
        for (ResourceManager rm : allResourceManagers) {
            resourceManagers.put(rm.getBranchType(), rm);
        }
    }
}   

public ResourceManager getResourceManager(BranchType branchType) {
    // 直接从集合中获取
    ResourceManager rm = resourceManagers.get(branchType);
    return rm;
}

ResourceManager是采用了SPI机制加载实现类,每个ResourceManager对应一个branchType,通过branchType直接从hash集合中取出了。

那么branchType=AT对应的是哪个ResourceManager实现类呢?

是DataSourceManager

到这里,我们似乎看到了和GlobalTransaction相似的地方,GlobalTransaction把相关的操作委托给TransactionManager来做。而DataSourceProxy明显把registerResource这件事委托给了ResourceManager来做。(后面的ConnectionProxy也是如此)

DataSourceManager.registerResource

我们跟进DataSourceManager的registerResource方法看看资源注册

private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();

@Override
public void registerResource(Resource resource) {
    DataSourceProxy dataSourceProxy = (DataSourceProxy)resource;
    // 先加入本地缓存
    dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
    // 调用父级的注册
    super.registerResource(dataSourceProxy);
}

DataSourceManager本地缓存了一份数据,而后调用父级注册方法,跟进父级方法

@Override
public void registerResource(Resource resource) {
    RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

父级方法比较简单,就是直接将当前Resource的groupId和resourceId通过RPC发送给Server端,从而注册当前Resource。

总结

到这里,一个以DataSource作为Resource的资源注册过程就结束了,其实相对简单,就是在构造DataSourceProxy方法的时候,发送RPC到Server端,增加一份数据而已。

在Seata的GlobalTransactionScanner的postProcessAfterInitialization中,原始配置的DataSource将会被代理为DataSourceProxy数据源代理对象。

而DataSourceProxy构造过程中会调用init初始化方法,进行Resource的注册。

原文地址:https://www.cnblogs.com/lay2017/p/12465620.html