一、seata自动配置

所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

正文

上一篇文章中,展示了springboot如何引入并使用seata来实现分布式事务的,基本使用以后接下来将开始进行源代码的阅读。毕竟阅读源代码总是比阅读文档令人有兴趣一点,而且了解他人的编码思路似乎也算是一个跨时空的交流?

作为源码篇的开篇, 将会阅读springboot引入seata进行自动配置的部分。

自动配置类SeataAutoConfiguration

seata的自动配置类命名非常的直接,就叫做:SeataAutoConfiguration,我们打开这个类

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
    
}

首先,@Configuration表明,SeataAutoConfiguration被定义为了spring的配置类。

@ConditionalOnProperty将配置类生效条件设置为seata.enabled=true,默认值是true,所以可以开关分布式事务功能。

@EnableConfigurationProperties将配置包转成了一个SeataProperties的Bean对象来使用。

@ComponentScan扫描了一下properties包,加载了一大堆类似SeataProperties的Bean对象。

接下来阅读SeataAutoConfiguration的内部代码

@Autowired
private SeataProperties seataProperties;

@Bean
public SpringUtils springUtils() {
    return new SpringUtils();
}

@Bean
@DependsOn({"springUtils"})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner() {return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup());
}

SpringUtils是一个实现了ApplicationContextAware的工具包,可以便捷地从容器当中getBean操作。

自动配置的核心点落在了下面的一个Bean,GlobalTransactionScanner。

我们看到构造这个Bean非常的简单,构造方法只需要一个applicationId和txServiceGroup。

applicationId: 就是spring.application.name=你定义的当前应用的名字,例如:userService

txServiceGroup: 就是以applicationId 加上 -seata-service-group命名的,例如:userService-seata-service-group。如果版本较低的话,那时候可能还不叫seata而是fescar,因此默认命名就是以fescar为后缀。

new了一个GlobalTransactionScanner对象,SeataAutoConfiguration这个自动配置类的作用就结束了。有点草率?是的,不过不影响。毕竟SeataAutoConfiguration只是做了一个启动引导的作用。

GlobalTransactionScanner主体逻辑

既然核心点落在GlobalTransactionScanner这个类,我们继续关注它。看这个名字其实就可以猜测到一点它的作用,扫描@GlobalTransactional这个注解,并对代理方法进行拦截增强事务的功能。

要了解这个类,不得不先阅读一下它的UML图

可以看到,GlobalTransactionScanner主要有4个点值得关注:

1)Disposable接口,表达了spring容器销毁的时候会进行一些操作

2)InitializingBean接口,表达了初始化的时候会进行一些操作

3)AbstractAutoProxyCreator表示它会对spring容器中的Bean进行切面增强,也就是我们上面的拦截事务增强的猜测。

4)ApplicationContextAware表示可以拿到spring容器

这里我们稍微关注一下这4个的执行顺序:

ApplicationContextAware -> InitializingBean -> AbstractAutoProxyCreator -> DisposableBean

我们重点关注一下InitializingBean和AbstractAutoProxyCreator的内容

InitializingBean

@Override
public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        return;
    }
    initClient();
}

初始化Seata的Client端的东西,Client端主要包括TransactionManager和ResourceManager。或许是为了简化吧,并没有把initClient这件事从GlobalTransactionScanner里面独立出来一个类。

跟进initClient方法

private void initClient() {
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(
            "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
    }
    
    //init TM
    TMClient.init(applicationId, txServiceGroup);
   
    //init RM
    RMClient.init(applicationId, txServiceGroup);
  
    registerSpringShutdownHook();
}

initClient逻辑并不复杂,单纯调用TMClient.init初始化TransactionManager的RPC客户端,RMClient.init初始化ResourceManager的RPC客户端。seata的RPC采用netty来实现,seata封装简化了一下使用。

TMClient比较简单,当初初始化RPC组件

public static void init(String applicationId, String transactionServiceGroup) {
    TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
    tmRpcClient.init();
}

我们关注一下RMClient的init方法

public static void init(String applicationId, String transactionServiceGroup) {
    // 获取单例对象
    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    // 设置ResourceManager的单例对象
    rmRpcClient.setResourceManager(DefaultResourceManager.get());
    // 添加监听器,监听Server端的消息推送
    rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
    // 初始化RPC
    rmRpcClient.init();
}

和TMClient想比,RMClient多出了一个监听Server端消息并处理的机制。也就是说TM的职责更多的是主动与Server端通信,比如:全局事务的begin、commit、rollback等。

而RM除了主动操作本地资源外,还会因为全局事务的commit、rollback等的消息推送,从而对本地资源进行相关操作。

AbstractAutoProxyCreator

GlobalTransactionScanner初始化完了TM和RM以后,我们再关注一下AbstractAutoProxyCreator,自动代理。

自动代理,它代理啥东西呢?或者说它给spring中的Bean增强了什么功能?

GlobalTransactionScanner主要扩展了AbstractAutoProxyCreator的两个方法

1)wrapIfNecessary:代理增强的前置判断处理,表示是否该Bean需要增强,如果增强的话创建代理类

2)postProcessAfterInitialization:当一个spring的Bean已经初始化完毕的时候,后置处理一些东西

wrapIfNecessary前置处理

@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {
        synchronized (PROXYED_SET) {
            // 相同Bean排重
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }

            interceptor = null;
            // 判断是否开启TCC模式
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                // TCC实现的拦截器
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
            } else {
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // 判断是否存在@GlobalTransactional或者@GlobalLock注解
                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                if (interceptor == null) {
                    // 非TCC的拦截器
                    interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);
                }
            }
            // 判断当前Bean是否已经是spring的代理类了
            if (!AopUtils.isAopProxy(bean)) {
                // 如果还不是,那么走一轮spring的代理过程即可
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 如果是一个spring的代理类,那么反射获取代理类中已经存在的拦截器集合,然后添加到该集合当中
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }

            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {}
}

wrapIfNecessary方法较长我们分步骤看看

1)isTccAutoProxy判断是否开启tcc模式,开启的话选择了TccActionInterceptor拦截器,非tcc模式选择GlobalTransactionalInterceptor拦截器,默认不开启

2)existAnnotation判断当前Bean是否有类或者接口的方法存在@GlobalTransactional或者@GlobalLock注解,如果没有则直接返回

3)isAopProxy方法是判断当前的Bean是否已经是spring的代理类了,无论是JDK动态代理还是Cglib类代理。如果是普通的Bean,走原有的生成代理逻辑即可,如果已经是代理类,那么要通过反射获取代理对象内的拦截器集合也叫做Advisor,直接添加到该集合当中。

wrapIfNecessary的方法并不复杂,但是如果对代理不是很熟悉或许对细节点会有些困惑。

postProcessAfterInitialization数据源代理

wrapIfNecessary创建了代理类,最后看看后置处理又做了啥。跟进该方法

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    // 判断是否数据源,且不是数据源代理
    if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
        // 创建静态代理
        DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
        Class<?>[] interfaces = SpringProxyUtils.getAllInterfaces(bean);
        // 创建动态代理类
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
                if (null != m) {
                    // 如果静态代理存在目标对象的代理方法,那么调用该代理方法,从而调用目标方法
                    return m.invoke(dataSourceProxy, args);
                } else {
                    // 如果静态代理不存在目标对象的代理方法,那么直接调用目标对象的代理方法
                    boolean oldAccessible = method.isAccessible();
                    try {
                        method.setAccessible(true);
                        return method.invoke(bean, args);
                    } finally {
                        //recover the original accessible for security reason
                        method.setAccessible(oldAccessible);
                    }
                }
            }
        });
    }
    // 不需要处理数据源代理的,按照原有逻辑处理
    return super.postProcessAfterInitialization(bean, beanName);
}

这里有两大块逻辑:

1)是数据源,且需要进行数据源代理的,那么特立独行地走if内的逻辑

2)不需要数据源代理的,那么走原有逻辑

我们关注一下数据源自代理的逻辑,数据源代理或许是seata非常重要的实现之一

首先,DataSource是一个接口,DataSourceProxy对DataSource是一种实现的关系。但是,请注意!!!spring中的数据源的Bean和DataSource是实现关系,可是该Bean和DataSourceProxy并不是继承或者实现的关系,而是组合关系。

因此,DataSourceProxy作为数据源Bean的静态代理而存在,而它实现了DataSource的接口,但是很有可能并未实现Bean的一些接口。

这样一来,就很有必要创建一个动态代理类,来关联一下DataSourceProxy和Bean之间的关系了。如果DataSourceProxy代理过的方法,那么调用代理方法,如果没有就直接调用Bean中的方法。

后置处理的大体逻辑就是这样,基本上就是对数据源进行自动代理处理。

总结

本文主要是自动配置的主体逻辑,自动配置围绕着GlobalTransactionScanner这个Bean展开。核心逻辑主要是三块:

1)初始化TransactionManager和ResourceManager的RPC客户端

2) 对@GlobalTransactional和@GlobalLock注解的方法进行增强

3)数据源代理

原文地址:https://www.cnblogs.com/lay2017/p/12434848.html