基于Spring读写分离

为什么是基于Spring的呢,因为实现方案基于Spring的事务以及AbstractRoutingDataSource(spring中的一个基础类,可以在其中放多个数据源,然后根据一些规则来确定当前需要使用哪个数据,既可以进行读写分离,也可以用来做分库分表)

我们只需要实现


 determineCurrentLookupKey()

每次生成jdbc connection时,都会先调用该方法来甄选出实际需要使用的datasource,由于这个方法并没有参数,因此,最佳的方式是基于ThreadLocal变量


 @Component
@Primary
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {

    @Resource(name = "masterDs")
    private DataSource masterDs;

    @Resource(name = "slaveDs")
    private DataSource slaveDs;

    @Override
    public void afterPropertiesSet() {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("slaveDs", slaveDs);
        targetDataSources.put("masterDs", masterDs);
        this.setTargetDataSources(targetDataSources);
        this.setDefaultTargetDataSource(masterDs);
        super.afterPropertiesSet();
    }

    @Override
    protected Object determineCurrentLookupKey() {
        return DynamicDataSourceHolder.contextHolder.get();
    }


    /**
     * 持有当前线程所有数据源的程序
     */
    public static class DynamicDataSourceHolder {
        public static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

        public static void setWrite() {
            contextHolder.set("masterDs");
        }

        public static void setRead() {
            contextHolder.set("slaveDs");
        }

        public static void remove() {
            contextHolder.remove();
        }

    }
}

DynamicRoutingDataSource 仅仅是一个DataSource实现,更高层的类主要用它来创建连接,如getConnection(),getConnection会先寻找实际的datasource,在创建连接


	@Override
	public Connection getConnection() throws SQLException {
		return determineTargetDataSource().getConnection();
	}

以上代码,使用了ThreadLocal contextHolder来存取当前需要的datasource的loopkey
contextHolder上的值可能没有被初始化,此时contextHolder.get() 等于 null,此时会使用默认的datasource,我们设置的默认值对应的是主库

  
	/**
	 * Retrieve the current target DataSource. Determines the
	 * {@link #determineCurrentLookupKey() current lookup key}, performs
	 * a lookup in the {@link #setTargetDataSources targetDataSources} map,
	 * falls back to the specified
	 * {@link #setDefaultTargetDataSource default target DataSource} if necessary.
	 * @see #determineCurrentLookupKey()
	 */
	protected DataSource determineTargetDataSource() {
		Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
		Object lookupKey = determineCurrentLookupKey();
		DataSource dataSource = this.resolvedDataSources.get(lookupKey);
		if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
			dataSource = this.resolvedDefaultDataSource;
		}
		if (dataSource == null) {
			throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
		}
		return dataSource;
	}

下一步,是在sql请求前,先对contextHolder赋值,手动赋值工作量很大,并且容易出错,也没有办法规范

实现方案是基于Aop,根据方法名,或者方法注解来区分

  • 根据方法名的好处比较明显,只要工程的managerdao层方法名称规范就行,不用到处添加注解,缺点是不精准
  • 根据注解来区分,缺点是需要在很多个类或接口上加注解,优点是精准

如果有特殊的业务,可以两种情况都使用,如以下场景:

  • 一个大的service,先后调用多个manager层dao层sql,先insert、后query,并且想直接查主库,也就是写后立即查,由于mysql主从同步可能会有延迟,写后立即查可能会读到老数据,写后立即查的情况比较复杂,如果不是事务的话,实现其实比较复杂,如何在非事务场景下,让两个顺序执行的请求,保持同一个connection,需单独调研,根据实际情况进行修改

我们将设置contextHolder的地方加在了dao层,出于以下考量:

  • 透过manager层,直接调用dao时,无风险
  • 当前工程采用的是手动在manager层开启事务,开启事务lookupKey一定为null,采用默认的masterDs,没有问题,事务开启链接后,dao层的每个被调用的方法,会使用事务中的链接(由Spring Transaction控制)
  • 如果在manager层开启事务,manager层的方法名可能不规范,dao层是最接近sql请求的地方,规范更容易遵循

当然,这不是最佳实践,如果在manager层做这个事,也是可以的,看具体的情况,要求是,名称或注解表意为query的方法,里面不能做任何更新操作,因为manager层已经确定了它会查从库,以下方法是会执行失败的


public class Manager1{

  ......
  public List getSome(params){
       dao1.getRecord1(params);
       dao2.updateLog(params);
  }

}

因为dao2是一个更新请求,使用从库进行更新,肯定是会失败的
(使用Spring Boot,或Spring时,需确保开启AspectJ,Spring Boot开启方式 @EnableAspectJAutoProxy(proxyTargetClass = true)

aop示例

 @Aspect
@Order(-10)
@Component
public class DataSourceAspect {

    public static final Logger logger = LoggerFactory.getLogger(DataSourceAspect.class);

    private static final String[] DefaultSlaveMethodStart
            = new String[]{"query", "find", "get", "select", "count", "list"};

    /**
     * 切入点,所有的mapper pcakge下面的类的方法
     */
    @Pointcut(value = "execution(* com.xx.xx.dao.mapper..*.*(..))")
    @SuppressWarnings("all")
    public void pointCutTransaction() {
    }

    /**
     * 根据方法名称,判断是读还是写
     * @param jp
     */
    @Before("pointCutTransaction()")
    public void doBefore(JoinPoint jp) {
        String methodName = jp.getSignature().getName();
        if (isReadReq(methodName)) {
            DynamicRoutingDataSource.DynamicDataSourceHolder.setRead();
        } else {
            DynamicRoutingDataSource.DynamicDataSourceHolder.setWrite();
        }
    }

    /**
     * 方法结束 finally 时执行
     * @param jp
     */
    @After("pointCutTransaction()")
    public void after(JoinPoint jp) {
        DynamicRoutingDataSource.DynamicDataSourceHolder.remove();
    }

    /**
     * 根据方法名,判断是否为读请求
     *
     * @param methodName
     * @return
     */
    private boolean isReadReq(String methodName) {
        for (String start : DefaultSlaveMethodStart) {
            if (methodName.startsWith(start)) {
                return true;
            }
        }
        return false;
    }
}

上面的代码,根据方法名称前缀,反向判断哪些方法应该使用从库,凡是不匹配的方法,都走主库
方法结束后,必须清空contextHolder,否则他可能发生混乱,如这里是manager层 call dao层,dao层退出执行后,不清空contextHolder,则manager层开启事务时,会直接使用dao的值,如果这个请求是query,分配给从库了,那么manager层开启事务时就用的是从库了,结果可想而知

如此就完成了读写分离

spring 事务

当前使用的是编程声明式事务

	@Override
	public <T> T execute(TransactionCallback<T> action) throws TransactionException {
		if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
			return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
		}
		else {
			TransactionStatus status = this.transactionManager.getTransaction(this);
			T result;
			try {
				result = action.doInTransaction(status);
			}
			catch (RuntimeException ex) {
				// Transactional code threw application exception -> rollback
				rollbackOnException(status, ex);
				throw ex;
			}
			catch (Error err) {
				// Transactional code threw error -> rollback
				rollbackOnException(status, err);
				throw err;
			}
			catch (Throwable ex) {
				// Transactional code threw unexpected exception -> rollback
				rollbackOnException(status, ex);
				throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
			}
			this.transactionManager.commit(status);
			return result;
		}
	}

来发起事务,execute方法中

    this.transactionManager.getTransaction(this)

将获取事务需要的链接,其内部会读取ThreadLocal变量,判断是否有事务连接,如果已有,或允许嵌套事务,则会重复利用当前事务链接
如果当前请求已经被包含到了事务中,则根据策略,判断是否新开一个事务或不使用事务,它们的方法基本相同:通过创建一个新的连接来处理,并将当前已被打开的事务先挂起,等待当前操作执行结束后,再恢复外部事务,继续执行
TransactionSynchronizationManager类记录了这些ThreadLocal变量,允许将事务bind到其resources中,DatasourceTransactionManager则会触发这些操作


	@Override
	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		txObject.setConnectionHolder(null);
		return TransactionSynchronizationManager.unbindResource(this.dataSource);  //挂起事务的基本原理:将外部事务放到新建事务(可能是非事务)的suspendedResources上来进行挂起
	}

	@Override
	protected void doResume(Object transaction, Object suspendedResources) {
		TransactionSynchronizationManager.bindResource(this.dataSource, suspendedResources);  //将suspendedResources重新绑定到threadLocal变量
	}

原文地址:https://www.cnblogs.com/windliu/p/8920467.html