springboot整合动态多数据源+分布式事务(亲测可用)

1.导入相关的依赖

 <!-- Mysql驱动包 这里请使用6.0.6版本的mysql,版本高了会报错-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>6.0.6</version>
        </dependency>

        <!--atomikos分布式事务-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

 <!--阿里数据库连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <!--常用工具类 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

  

2.相关的yml配置

datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    druid:
      # 主库数据源
      master:
        url: jdbc:mysql://localhost:3306/saas_master?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false&pinGlobalTxToPhysicalConnection=true
        username: root
        password: 
      # 从库数据源
      slave:
        # 从数据源开关/默认关闭
        open: true
        type: com.alibaba.druid.pool.xa.DruidXADataSource
        driverClassName: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/saas_slave?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false&pinGlobalTxToPhysicalConnection=true
        username: root
        password: 

  

3利用aop原理来动态 切换数据源,相关数据源的处理网上的基本差不多,本人也是参考了很多博主得来的,至于哪些博主得就不太记得了,将就着看吧

package com.zt.common.annotation;

import com.zt.common.enums.DataSourceType;

import java.lang.annotation.*;

/**
* 自定义多数据源切换注解
*
* @author lyh
*/
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DataSource
{
/**
* 切换数据源名称
*/
public DataSourceType value() default DataSourceType.MASTER;
}
package com.zt.common.aspect;


import com.zt.common.annotation.DataSource;
import com.zt.common.datasource.DynamicDataSourceContextHolder;
import com.zt.common.utils.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
* 多数据源处理
*
* @author lyh
*/
@Aspect
@Order(1)
@Component
public class DataSourceAspect
{
protected Logger logger = LoggerFactory.getLogger(getClass());

@Pointcut("@annotation(com.zt.common.annotation.DataSource)"
+ "|| @within(com.zt.common.annotation.DataSource)")
public void dsPointCut()
{

}

@Around("dsPointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable
{
DataSource dataSource = getDataSource(point);

if (StringUtils.isNotNull(dataSource))
{
DynamicDataSourceContextHolder.setDataSourceType(dataSource.value().name());
}

try
{
return point.proceed();
}
finally
{
// 销毁数据源 在执行方法之后
DynamicDataSourceContextHolder.clearDataSourceType();
}
}

/**
* 获取需要切换的数据源
*/
public DataSource getDataSource(ProceedingJoinPoint point)
{
MethodSignature signature = (MethodSignature) point.getSignature();
Class<? extends Object> targetClass = point.getTarget().getClass();
DataSource targetDataSource = targetClass.getAnnotation(DataSource.class);
if (StringUtils.isNotNull(targetDataSource))
{
return targetDataSource;
}
else
{
Method method = signature.getMethod();
DataSource dataSource = method.getAnnotation(DataSource.class);
return dataSource;
}
}
}
package com.zt.common.datasource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
* 数据源切换处理
*
* @author
*/
public class DynamicDataSourceContextHolder
{
public static final Logger log = LoggerFactory.getLogger(DynamicDataSourceContextHolder.class);

/**
* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
*/
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();

/**
* 也就是所谓的数据库别名
* 管理所有的数据源id;
* 主要是为了判断数据源是否存在;
*/
public static List<String> dataSourceIds = new ArrayList<String>();

/**
* 设置数据源的变量
*/
public static void setDataSourceType(String dsType)
{
log.info("切换到{}数据源", dsType);
CONTEXT_HOLDER.set(dsType);
}

/**
* 获得数据源的变量
*/
public static String getDateSoureType()
{
return CONTEXT_HOLDER.get();
}

/**
* 清空数据源变量
*/
public static void clearDataSourceType()
{
CONTEXT_HOLDER.remove();
}




/**
* 判断指定DataSrouce当前是否存在
*
* @param dataSourceId
* @return
*/

public static boolean containsDataSource(String dataSourceId){

return dataSourceIds.contains(dataSourceId);

}
}
package com.zt.common.datasource;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import javax.sql.DataSource;
import java.util.Map;

/**
* 动态数据源
*
* @author
*/
public class DynamicDataSource extends AbstractRoutingDataSource
{
public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources)
{
super.setDefaultTargetDataSource(defaultTargetDataSource);
super.setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}

/**
* 主要是实现本方法:
* 而此方法只需要返回一个数据库的名称即可,所以我们核心的是有一个类来管理数据源的线程池,这个类才是动态数据源的核心处理类。
* @return
*/
@Override
protected Object determineCurrentLookupKey()
{
/**
* DynamicDataSourceContextHolder代码中使用setDataSourceType
* 设置当前的数据源,在路由类中使用getDataSourceType进行获取,
* 交给AbstractRoutingDataSource进行注入使用.
*/
return DynamicDataSourceContextHolder.getDateSoureType();
}
}
package com.zt.common.config;

import com.zt.common.datasource.DynamicDataSource;
import com.zt.common.enums.DataSourceType;
import com.zt.common.interceptor.PrepareInterceptor;
import com.zt.common.transaction.MultiDataSourceTransactionFactory;
import com.zt.common.transaction.PackagesSqlSessionFactoryBean;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.lang.Nullable;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* druid 配置多数据源
*
* @author lyh
*/
@Configuration
@EnableTransactionManagement //开启事务
//@MapperScan("com.zt.*.mapper")
@Import({PrepareInterceptor.class})
public class DruidMutilConfig {

@Autowired
PrepareInterceptor prepareInterceptor;

@Bean(name = "masterDataSource")
public DataSource masterDataSource(Environment env) {
String sourceName = "master";
Properties prop = build(env, "spring.datasource.druid.master.");
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
//druid的数据库驱动换成xa的
xaDataSource.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
xaDataSource.setUniqueResourceName(sourceName);
//下面两行尝试解决 exceptionSorter com.alibaba.druid.pool.vendor.MySqlExceptionSorter
xaDataSource.setMaintenanceInterval(28000);
xaDataSource.setTestQuery("SELECT 1");
xaDataSource.setPoolSize(5);
xaDataSource.setXaProperties(prop);
return xaDataSource;
}

@Bean(name = "slaveDataSource")
public DataSource slaveDataSource(Environment env) {
String sourceName = "slave";
Properties prop = build(env, "spring.datasource.druid.slave.");
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
//druid的数据库驱动换成xa的
xaDataSource.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
xaDataSource.setUniqueResourceName(sourceName);
//下面两行尝试解决 exceptionSorter com.alibaba.druid.pool.vendor.MySqlExceptionSorter
xaDataSource.setMaintenanceInterval(28000);
xaDataSource.setTestQuery("SELECT 1");
xaDataSource.setPoolSize(5);
xaDataSource.setXaProperties(prop);
return xaDataSource;

}

private Properties build(Environment env, String prefix) {

Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
//这里只设置了简单的几个属性,如果想做更多的配置可以继续往下添加即可
return prop;
}

/**
* 动态数据源,在这继续添加 DataSource Bean
*/
@Bean(name = "dynamicDataSource")
@Primary
public DynamicDataSource dataSource(@Qualifier("masterDataSource") DataSource masterDataSource, @Nullable @Qualifier("slaveDataSource") DataSource slaveDataSource) {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceType.MASTER.name(), masterDataSource);
if (slaveDataSource != null){
targetDataSources.put(DataSourceType.SLAVE.name(), slaveDataSource);
}
// 还有数据源,在targetDataSources中继续添加
return new DynamicDataSource(masterDataSource, targetDataSources);
}

@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactory sqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dataSource)
throws Exception {
//参照的别人的代码说需要将会话工厂改成mybatis-plus的sql会话工厂,
//经测试发现使用mybatis的会话工厂也可以运行,不会报错
// MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
//使用了PackagesSqlSessionFactoryBean继承SqlSessionFactoryBean,重写了配置别名的方法
PackagesSqlSessionFactoryBean bean = new PackagesSqlSessionFactoryBean();
bean.setPlugins(new Interceptor[]{prepareInterceptor});
bean.setDataSource(dataSource);
//设置多数据源分布式事务
bean.setTransactionFactory(new MultiDataSourceTransactionFactory());
bean.setVfs(SpringBootVFS.class);
bean.setTypeAliasesPackage("com.zt");//通配符设置包别名
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/**/*Mapper.xml"));// 扫描指定目录的xml
return bean.getObject();
}

@Bean(name = "sqlSessionTemplate")
@Primary
public SqlSessionTemplate sqlSessionTemplate(
@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}

4.事务相关的处理

package com.zt.common.transaction;

import com.alibaba.druid.support.logging.Log;
import com.alibaba.druid.support.logging.LogFactory;
import com.zt.common.datasource.DynamicDataSourceContextHolder;
import org.apache.ibatis.transaction.Transaction;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.datasource.DataSourceUtils;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.apache.commons.lang3.Validate.notNull;

/**
* <P>多数据源切换,支持事务</P>
* <P>多数据源事务管理器是:根据数据源的不同类型,动态获取数据库连接,而不是从原来的缓存中获取导致数据源没法切换</P>
* @author lyh
*/
public class MultiDataSourceTransaction implements Transaction {
private static final Log LOGGER = LogFactory.getLog(MultiDataSourceTransaction.class);

private final DataSource dataSource;

private Connection mainConnection;

private String mainDatabaseIdentification;

private ConcurrentMap<String, Connection> otherConnectionMap;


private boolean isConnectionTransactional;

private boolean autoCommit;


public MultiDataSourceTransaction(DataSource dataSource) {
notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
otherConnectionMap = new ConcurrentHashMap<>();
mainDatabaseIdentification= DynamicDataSourceContextHolder.getDateSoureType();
}


/**
* 开启事务处理方法
*/
@Override
public Connection getConnection() throws SQLException {
String databaseIdentification = DynamicDataSourceContextHolder.getDateSoureType();
if (null==databaseIdentification||databaseIdentification.equals(mainDatabaseIdentification)) {
if (mainConnection != null) return mainConnection;
else {
openMainConnection();
mainDatabaseIdentification =databaseIdentification;
return mainConnection;
}
} else {
if (!otherConnectionMap.containsKey(databaseIdentification)) {
try {
Connection conn = dataSource.getConnection();
otherConnectionMap.put(databaseIdentification, conn);
} catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
}
}
return otherConnectionMap.get(databaseIdentification);
}

}


private void openMainConnection() throws SQLException {
this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.mainConnection.getAutoCommit();
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"JDBC Connection ["
+ this.mainConnection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}

/**
* 提交处理方法
*/
@Override
public void commit() throws SQLException {
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Committing JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.commit();
for (Connection connection : otherConnectionMap.values()) {
connection.commit();
}
}
}

/**
* 回滚处理方法
*/
@Override
public void rollback() throws SQLException {
if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Rolling back JDBC Connection [" + this.mainConnection + "]");
}
this.mainConnection.rollback();
for (Connection connection : otherConnectionMap.values()) {
connection.rollback();
}
}
}

/**
* 关闭处理方法
*/
@Override
public void close() throws SQLException {
DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
for (Connection connection : otherConnectionMap.values()) {
DataSourceUtils.releaseConnection(connection, this.dataSource);
}
}

@Override
public Integer getTimeout() throws SQLException {
return null;
}
}
package com.zt.common.transaction;


import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;

import javax.sql.DataSource;

/**
* <P>支持Service内多数据源切换的Factory</P>
*
* @author lyh
*/
public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {
@Override
public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
return new MultiDataSourceTransaction(dataSource);
}
}
package com.zt.common.transaction;

import org.apache.commons.lang3.StringUtils;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.core.type.classreading.CachingMetadataReaderFactory;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.util.ClassUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/** 配置typeAliasesPackage支持通配符包路径扫描
* 通过继承重写包路径读取方式来实现支持通配符配置,以前的SqlSessionFactoryBean
* 不支持通配符设置包别名,所以重写该方法
* Create by lyh
*/
public class PackagesSqlSessionFactoryBean extends SqlSessionFactoryBean {
private static final Logger logger = LoggerFactory.getLogger(PackagesSqlSessionFactoryBean.class);

static final String DEFAULT_RESOURCE_PATTERN = "**/*.class";

@Override
public void setTypeAliasesPackage(String typeAliasesPackage) {
ResourcePatternResolver resolver = (ResourcePatternResolver) new PathMatchingResourcePatternResolver();
MetadataReaderFactory metadataReaderFactory = new CachingMetadataReaderFactory(resolver);
typeAliasesPackage = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX +
ClassUtils.convertClassNameToResourcePath(typeAliasesPackage) + "/" + DEFAULT_RESOURCE_PATTERN;

//将加载多个绝对匹配的所有Resource
//将首先通过ClassLoader.getResource("META-INF")加载非模式路径部分
//然后进行遍历模式匹配
try {
List<String> result = new ArrayList<String>();
Resource[] resources = resolver.getResources(typeAliasesPackage);
if(resources != null && resources.length > 0){
MetadataReader metadataReader = null;
for(Resource resource : resources){
if(resource.isReadable()){
metadataReader = metadataReaderFactory.getMetadataReader(resource);
try {
result.add(Class.forName(metadataReader.getClassMetadata().getClassName()).getPackage().getName());
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
}
if(result.size() > 0) {
super.setTypeAliasesPackage(StringUtils.join(result.toArray(), ","));
}else{
logger.warn("参数typeAliasesPackage:"+typeAliasesPackage+",未找到任何包");
}
} catch (IOException e) {
e.printStackTrace();
}
}

}
package com.zt.common.transaction;


import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

/** 分布式事务管理器
* 多数据源操作发生异常时,让多数据源的事务进行同步回滚
* Create by lyh
*/
@Configuration
public class XATransactionManagerConfig {
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}

@Bean(name = "atomikosTransactionManager")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}

@Bean(name = "transactionManager")
@DependsOn({ "userTransaction", "atomikosTransactionManager" })
public PlatformTransactionManager transactionManager() throws Throwable {
return new JtaTransactionManager(userTransaction(),atomikosTransactionManager());
}
}
 

//下面两行尝试解决  exceptionSorter com.alibaba.druid.pool.vendor.MySqlExceptionSorter        xaDataSource.setMaintenanceInterval(28000);        xaDataSource.setTestQuery("SELECT 1");

原文地址:https://www.cnblogs.com/itliyh/p/13251572.html