栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

当mybatis-plus-dynamicdatasource遇到@Transactional

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

当mybatis-plus-dynamicdatasource遇到@Transactional

事务生效,但数据源切换失败。

目录
      • 1. 前言
      • 2. 解决方案
      • 3. 最佳实践
      • 4. 原理分析
        • 4.1 相关类
      • 5. 参考

1. 前言

问题:使用mybatis-plus-dynamicdatasource实现数据源切换操作功能,在遇到同时借助spring-tx进行事务处理时,会发现数据源无法正确地切换到辅库。

能点开这篇博客的基本都是奔着解决方案来的,所以以下我们先介绍解决方案和相应的最佳实践;最后再分析下原因,过下相关源代码。

2. 解决方案

以下直接以代码形式,同时展示问题场景和解决方案。

@Service
public class TestService {
 	// 主库
    @Autowired
    private MasterxMapper masterMapper;
    
    // 辅库 
    @Autowired
    private slaveMapper slaveMapper; 
 
    //启用事务注解
    @Transactional
    public void test(){
        Console.log("main logic");
         
        User user = new User();
        user.setUuid(IdUtil.simpleUUID());
        user.setXxx("12332112311");
        // 主库相关操作
        masterMapper.insert(user);
         
        // 辅库操作, 注意这里的写法; 这是为了让Spring AOP生效
        getService().slaveLook();
    }
     
    // 声明本方法内,以非事务方式运行,以实现数据源切换
    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    // 声明本方法内, 我们将要操作的是副库
    @DS("slave")
    // 必须为public ;;; 参见 AnnotationTransactionAttributeSource.allowPublicMethodsonly()
    public void slaveLook(){
        // 操作辅库
        Console.log(slaveMapper.selectById(123));
         
        // 如果抛出异常,整个调用链路中的主库操作将回滚,对应本例中是 masterMapper.insert(user);
        //throw new RuntimeException();
         
    }
     
    private TestService getService(){
        // 从Spring容器中取出本实例
        return SpringUtil.getBean(this.getClass());
    }
3. 最佳实践
  1. 再次提醒,副库并不会获得事务的支持。如果你需要同时支持辅库上的修改操作,请使用Atomikos等分布式事务。

  2. @DS建议打在Service上,这是源自mybatis-plus-dynamicdatasource官方的建议。

  3. 上面的getService()方法可以考虑抽取为一个接口,在Java8的默认接口方法实现的支持下,这将是个非常简单的事情。

    public interface SelfAop {
    	default T getSelf() {
    		return ((T)SpringUtil.getBean(this.getClass()));
    
    	//	ParameterizedType currentType = Stream.of(getClass().getGenericInterfaces()).map(s -> (ParameterizedType) s)
    	//		.filter(s -> s.getRawType() == SelfAop.class).findFirst().get();
    	//
    	//	@SuppressWarnings("unchecked")		
    	//	return SpringUtil.getBean((Class)currentType.getActualTypeArguments()[0]);
    	}
    }
    
4. 原理分析

以上TestService.test() 执行时,将获得如下堆栈:

// ==================================== 1. DataSourceTransactionManager.java
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	Connection con = null;

	try {
		if (!txObject.hasConnectionHolder() ||
				txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
			Connection newCon = obtainDataSource().getConnection();
			if (logger.isDebugEnabled()) {
				logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
			}
			txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
		}

		txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
		con = txObject.getConnectionHolder().getConnection();

		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
		txObject.setPreviousIsolationLevel(previousIsolationLevel);
		txObject.setReadOnly(definition.isReadOnly());

		// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
		// so we don't want to do it unnecessarily (for example if we've explicitly
		// configured the connection pool to set it already).
		if (con.getAutoCommit()) {
			txObject.setMustRestoreAutoCommit(true);
			if (logger.isDebugEnabled()) {
				logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
			}
			con.setAutoCommit(false);
		}

		prepareTransactionalConnection(con, definition);
		txObject.getConnectionHolder().setTransactionActive(true);

		int timeout = determineTimeout(definition);
		if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
			txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
		}
		// 绑定到当前线程上下文, 
		//  在 DataSourceUtils.doGetConnection(dataSource) 中用到; 呼应上面的调用堆栈
		// Bind the connection holder to the thread.
		if (txObject.isNewConnectionHolder()) {
			// 下方截图; 实现细节参见下方源码
			TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
		}
	}

	catch (Throwable ex) {
		if (txObject.isNewConnectionHolder()) {
			DataSourceUtils.releaseConnection(con, obtainDataSource());
			txObject.setConnectionHolder(null, false);
		}
		throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
	}
}

// ==================================== 2. TransactionSynchronizationManager.java
// 注意这是一个静态方法
public static void bindResource(Object key, Object value) throws IllegalStateException {
	Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
	Assert.notNull(value, "Value must not be null");
	Map map = resources.get();
	// set ThreadLocal Map if none found
	if (map == null) {
		map = new HashMap<>();
		// resources类型为: ThreadLocal>
		resources.set(map);
	}
	// 本例中:
	//  1. actualKey实际类型为: com.baomidou.dynamic.datasource.DynamicRoutingDataSource
	//  2. value实际类型为: org.springframework.jdbc.datasource.ConnectionHolder
	Object oldValue = map.put(actualKey, value);
	// Transparently suppress a ResourceHolder that was marked as void...
	if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
		oldValue = null;
	}
	if (oldValue != null) {
		throw new IllegalStateException(
				"Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
	}
}

// ==================================== 3. DataSourceUtils
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
	Assert.notNull(dataSource, "No DataSource specified");

	ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
	if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
		//主库逻辑执行完毕,再次进行副库操作时,逻辑将走这条分支
		conHolder.requested();
		if (!conHolder.hasConnection()) {
			logger.debug("Fetching resumed JDBC Connection from DataSource");
			conHolder.setConnection(fetchConnection(dataSource));
		}
		return conHolder.getConnection();
	}
	// Else we either got no holder or an empty thread-bound holder here.
	// ---- 从DataSource中获取Connection, 想要触发@DS, 必须走到这里
	logger.debug("Fetching JDBC Connection from DataSource");
	Connection con = fetchConnection(dataSource);

	if (TransactionSynchronizationManager.isSynchronizationActive()) {
		......
	}

	return con;
}

/ 针对以上 DataSourceUtils.doGetConnection(DataSource dataSource), 与Mybatis相关的堆栈如下:
SimpleExecutor.prepareStatement()
	baseExecutor.getConnection(Log statementLog)
		transaction.getConnection();  // Connection是从事务中获取到的,实际类型为 SpringManagedTransaction
			DataSourceUtils.doGetConnection(DataSource dataSource) // 如果ThreadLocal中发现存在Connect, 直接返回
4.1 相关类
  1. SpringTransactionAnnotationParser。 启动阶段解析@Transactional
  2. TransactionInterceptor // 执行阶段 @Transactional
  3. DynamicDataSourceAnnotationInterceptor // 针对注解 @DS
5. 参考
  1. spring boot学习7之mybatis+mysql读写分离(一写多读)+事务
  2. SpringBoot事务隔离等级和传播行为
  3. dynamic-datasource切换数据源失败
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/695265.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号