sharding-jdbc源码解析

参考博客:https://cloud.tencent.com/developer/article/1529692

 

 看sharding-jdbc支持XA协议重点看下面的代码

 sharding-transaction-xa-atomikos模块中主要是原生的atomikos的配置,atomikos事务配置都是在transactions.properties中进行配置的

sharding-transaction模块由sharding-transaction-core,sharding-transaction-2pc和sharding-transaction-base这3个子模块组成。

Apache ShardingSphere(Incubating)能够自动将XADataSource作为数据库驱动的数据源接入XA事务管理器。而针对于使用DataSource作为数据库驱动的应用,用户也无需改变其编码以及配置,Apache ShardingSphere(Incubating)通过自动适配的方式,在中间件内部将其转化为支持XA协议的XADataSource和XAConnection,并将其作为XA资源注册到底层的XA事务管理器中。

ShardingSphere还会实现XA协议的recovery部分,即在事务处理器出现崩溃的情况时,可以有能力提供in-doubt transactions来实现事务恢复。

不支持informix数据库atomikosTransactionManager

 

 sharding-jdbc支持事务上面的两个注解是需要的

 @Transactional表示底层使用spring的事务管理,spring底层的事务管理器使用默认的com.atomikos.icatch.jta.UserTransactionManager

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
 
 
    <!--==========针对两个库,各配置一个AtomikosDataSourceBean,底层都使用MysqlXADataSource=====================-->
    <!--配置数据源db_user-->
    <bean id="db_user" class="com.atomikos.jdbc.AtomikosDataSourceBean"
          init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="ds1" />
        <property name="xaDataSourceClassName"
                  value="com.mysql.cj.jdbc.MysqlXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="url">jdbc:mysql://localhost:3306/db_user?serverTimezone=UTC</prop>
                <prop key="user">root</prop>
                <prop key="password">123456</prop>
            </props>
        </property>
    </bean>
 
    <!--配置数据源db_account-->
    <bean id="db_account" class="com.atomikos.jdbc.AtomikosDataSourceBean"
          init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="ds2" />
        <property name="xaDataSourceClassName"
                  value="com.mysql.cj.jdbc.MysqlXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="url">jdbc:mysql://localhost:3306/db_account?serverTimezone=UTC</prop>
                <prop key="user">root</prop>
                <prop key="password">123456</prop>
            </props>
        </property>
    </bean>
 
    <!--=============针对两个数据源,各配置一个SqlSessionFactoryBean============ -->
    <bean id="ssf_user" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="db_user" />
    </bean>
 
    <bean id="ssf_account" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="db_account" />
    </bean>
 
    <!--=============针对两个SqlSessionFactoryBean,各配置一个MapperScannerConfigurer============ -->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="sqlSessionFactoryBeanName" value="ssf_user"/>
        <!--指定com.tianshouzhi.atomikos.mappers.db_user包下的UserMapper接口使用ssf_user获取底层数据库连接-->
        <property name="basePackage" value="com.tianshouzhi.atomikos.mappers.db_user"/>
    </bean>
 
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="sqlSessionFactoryBeanName" value="ssf_account"/>
        <!--指定com.tianshouzhi.atomikos.mappers.ds_account包下的AccountMapper接口使用ssf_account获取底层数据库连接-->
        <property name="basePackage" value="com.tianshouzhi.atomikos.mappers.ds_account"/>
    </bean>
 
    <!--================配置atomikos事务管理器========================-->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"
          destroy-method="close">
        <property name="forceShutdown" value="false"/>
    </bean>
 
    <!--============配置spring的JtaTransactionManager,底层委派给atomikos进行处理===============-->
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosTransactionManager"/>
    </bean>
 
    <!--配置spring声明式事务管理器-->
    <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
 
    <bean id="jtaService" class="com.tianshouzhi.atomikos.JTAService"/>
</beans>

第二个注解 @ShardingTransactionType(TransactionType.XA)也是需要的,改注解sharding-jdbc将一般的数据源拦截成XADatasource,将一般的事务管理器封装成XA的事务管理器atomikosTransactionManager

这里的原理在于

https://blog.csdn.net/liu1390910/article/details/94554356

 

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

com.atomikos.icatch.serial_jta_transactions = false
com.atomikos.icatch.default_jta_timeout = 300000
com.atomikos.icatch.max_actives = 10000
com.atomikos.icatch.checkpoint_interval = 50000
com.atomikos.icatch.enable_logging = true
com.atomikos.icatch.log_base_name = xa_tx
com.atomikos.icatch.log_base_dir = ./logs

atomikos 创建数据源,报Max number of active transactions

 
技术小美 2017-11-15 13:25:00 浏览1828
 
在使用atomikos 事务管理中,当并发数超过50的时候会产生异常如下:
java.lang.IllegalStateException:Max number of active transactions reched:50

原因:

atomikos的默认配置中 transactions.properties中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE
# THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER
# UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;
 
# Required: factory implementation class of the transaction core.
# NOTE: there is no default for this, so it MUST be specified! 
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
 
         
# Set base name of file where messages are output 
# (also known as the 'console file').
#
# com.atomikos.icatch.console_file_name = tm.out
 
# Size limit (in bytes) for the console file;
# negative means unlimited.
#
# com.atomikos.icatch.console_file_limit=-1
 
# For size-limited console files, this option
# specifies a number of rotating files to 
# maintain.
#
# com.atomikos.icatch.console_file_count=1
 
# Set the number of log writes between checkpoints
#
# com.atomikos.icatch.checkpoint_interval=500
 
# Set output directory where console file and other files are to be put
# make sure this directory exists!
#
# com.atomikos.icatch.output_dir = ./
 
# Set directory of log files; make sure this directory exists!
#
# com.atomikos.icatch.log_base_dir = ./
 
# Set base name of log file
this name will be  used as the first part of 
# the system-generated log file name
#
# com.atomikos.icatch.log_base_name = tmlog
 
# Set the max number of active local transactions 
# or -1 for unlimited.
#
# com.atomikos.icatch.max_actives = 50 (原因)
 
# Set the default timeout (in milliseconds) for local transactions
#
# com.atomikos.icatch.default_jta_timeout = 10000
 
# Set the max timeout (in milliseconds) for local transactions
#
# com.atomikos.icatch.max_timeout = 300000
 
# The globally unique name of this transaction manager process
# override this value with a globally unique name
#
# com.atomikos.icatch.tm_unique_name = tm
     
# Do we want to use parallel subtransactions? JTA's default
# is NO for J2EE compatibility
#
# com.atomikos.icatch.serial_jta_transactions=true
                     
# If you want to do explicit resource registration then
# you need to set this value to false.
#
# com.atomikos.icatch.automatic_resource_registration=true  
     
# Set this to WARN, INFO or DEBUG to control the granularity
# of output to the console file.
#
# com.atomikos.icatch.console_log_level=WARN
     
# Do you want transaction logging to be enabled or not?
# If set to false, then no logging overhead will be done
# at the risk of losing data after restart or crash.
#
# com.atomikos.icatch.enable_logging=true
 
# Should two-phase commit be done in (multi-)threaded mode or not?
# Set this to false if you want commits to be ordered according
# to the order in which resources are added to the transaction.
#
# NOTE: threads are reused on JDK 1.5 or higher. 
# For JDK 1.4, thread reuse is enabled as soon as the 
# concurrent backport is in the classpath - see 
# http://mirrors.ibiblio.org/pub/mirrors/maven2/backport-util-concurrent/backport-util-concurrent/
#
# com.atomikos.icatch.threaded_2pc=false
 
# Should shutdown of the VM trigger shutdown of the transaction core too?
#
# com.atomikos.icatch.force_shutdown_on_vm_exit=false

修改默认配置中的:

1
com.atomikos.icatch.max_actives = 50------改为更大就可以解决
 
http://www.voidcn.com/article/p-odlbtmrm-boe.html

 使用atomikos时,事务默认超时时间是100000毫秒,超过这个时间,提交事务就会抛出异常com.atomikos.icatch.RollbackException: Prepare: NO vote。

    今天总算通过bing找到了答案,记录一下。

    在网上很多说的要设置com.atomikos.icatch.max_timeout和com.atomikos.icatch.default_jta_timeout,居然都没说怎么设置,集成Spring的情况下,第一时间想到在bean的属性里配置,结果没找到(default_jta_timeout可以在org.springframework.transaction.jta.JtaTransactionManager中找到defaultTimeOut属性与之匹配,但是max_timeout没有找到,官方文档(https://www.atomikos.com/Documentation/JtaProperties)也没说,只是说设置max_timeout和UserTransaction.setTransactionTimeout()是一个意思,结果我配置了半天,没有效果)。最后搞明白了,需要在classpath下建一个jta.properties(或者transactions.properties)文件(事务管理器的配置),来配置事务相关属性,如下是我的jta.properties。

    PS:default_jta_timeout表示开启事务时,默认的超时时间,max_timeout表示最大的超时时间,0表示无限时间。如果default_jta_timeout设置的值超过了max_timeout,会自动将超时时间截断,使用max_timeout的值(日志会打印出来)。

 然后,Spring的配置文件

<!-- Atomikos 事务管理器配置 -->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
        init-method="init" destroy-method="close">
        <!-- <property name="startupTransactionService" value="false" /> -->
        <!-- close()时是否强制终止事务 -->
        <property name="forceShutdown" value="false" />
    </bean>

    <!-- Atomikos UserTransaction配置 -->
    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"></bean>

    <!-- JTA事务管理器 -->
    <bean id="txManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager">
            <ref bean="atomikosTransactionManager" />
        </property>
        <property name="userTransaction">
            <ref bean="atomikosUserTransaction" />
        </property>
    </bean>
    <tx:annotation-driven transaction-manager="txManager" proxy-target-class="true" />

 为XA和非XA提供内置的JDBC适配器,所有不再需要配置多余的东西

 

 sharding-transaction-core主要的功能在于ShardingTransactionManager类以spi扩展的是是哪些事务类型来生成对于的事务管理器,例如当前是XA的atomitx事务管理器,那么这里ShardingTransactionManager对于的事务管理器就是com.atomikos.icatch.jta.UserTransactionManager

 

 sharding-transaction-spring模块就是扫描断言的,当你的方法配置了@ShardingTransactionType (TransactionType.XA)注解之后,harding-transaction-spring模块会拦截到改注解,通过Threadlocal切换当前事务类型,适用于Sharding-JDBC。例如:TransactionTypeHolder.set (TransactionType.XA),代码如下

 Apache ShardingSphere(Incubating)官方目前实现了基于Atomikos和Bitronix的SPI,并且邀请了Radhat JBoss的XA事务引擎Narayana [https://github.com/jbosstm/narayana] 开发团队实现了JBoss的SPI。用户可以自行的在Atomikos,Bitronix和Narayana间选择自己喜欢的XA事务管理器。

 sharding-transaction-xa-sp模块就是sharding-jdbc提供的模块,将外部的XAresource资源封装到事务管理器中XATransactionManager中,这里XA

原文地址:https://www.cnblogs.com/kebibuluan/p/12887116.html