栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spring AOP事务实现原理之事务管理器TransactionManager

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring AOP事务实现原理之事务管理器TransactionManager

流程图

该图介绍了事务的传播行为

该流通图展示的是TransactionManager具体如何结合事务的传播行为进行事务的管理

TransactionManager
public interface TransactionManager {}
PlatformTransactionManager

这是 Spring 事务基础类中的中心接口。

对于实现者,建议从提供的 {@link org.springframework.transaction.support.AbstractPlatformTransactionManager} 类派生,该类实现了事务的传播行为并负责事务同步处理。该类使用了模板模式,子类必须为底层事务的特定状态实现模板方法,例如:begin, suspend, resume, commit。

该策略接口的默认实现是{@link org.springframework.transaction.jta.JtaTransactionManager}和{@link org.springframework.jdbc.datasource.DataSourceTransactionManager},可以作为其他事务策略的实现指南。

public interface PlatformTransactionManager extends TransactionManager {}
getTransaction(TransactionDefinition definition)

根据事务的传播特性,返回一个有效的事务或者创建一个新事务

    隔离级别、超时时间,只应用于新创建的事务并不是所有的事务定义的属性都会被每一个事务管理器支持,如果事务管理器不支持,应该抛异常上述规则的一个例外是只读标志,如果不支持显式只读模式,则忽略。
	TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException;
commit(TransactionStatus status)

提交给定的事务

    如果事务已被标记为仅回滚rollback-only,则执行回滚(对应rollback方法中的1)如果先前的事务已被挂起以便能够创建新事务,则在提交新事务后恢复先前的事务。当commit调用完成时,无论是正常还是抛出异常,事务都必须完全完成并清理。
	void commit(TransactionStatus status) throws TransactionException;
rollback(TransactionStatus status)

执行跟定事务的回滚操作

    如果事务不是新事务,只需将其设置为仅回滚(rollback-only)以正确参与外层事务。如果先前的事务已暂停以便能够创建新事务,则在回滚新事务后恢复先前的事务。如果事务commit时抛出异常,则不要在调用回滚;因为当commit返回时,事务已经completed和clean up,在AbstractPlatformTransactionManager#processRollback中会有cleanupAfterCompletion方法,并且在AbstractPlatformTransactionManager#commit中首先会检查事务状态,如果事务已经结束,则抛异常
	void rollback(TransactionStatus status) throws TransactionException;
AbstractPlatformTransactionManager

该抽象类实现了Spring 标准事务工作流,作为具体平台事务管理器的基础,例如 {@link org.springframework.transaction.jta.JtaTransactionManager}。

该类使用了模板模式,子类必须为底层事务的特定状态实现模板方法,例如:begin, suspend, resume, commit。

此基类提供以下工作流处理:

确定是否已经存在了一个事务应用适当的事务传播特性;必要时挂起和恢复事务;在commit时检查rollback-only标志;对rollback应用适当的修改(实际回滚或设置仅回滚标志rollback-only);触发已注册的同步回调(如果事务同步处于活动状态)。

事务同步是一种通用机制,用于注册在事务完成时调用的回调。

getTransaction

该方法处理事务的传播行为,委托给{@code doGetTransaction},{@code isExistingTransaction}

获取到Object transaction后,根据当前事务是否已经存在和事务的传播行为进行不同的处理

如果存在一个事务,则根据事务的传播属性进行相关处理处理如果不存在事务,但是有需要强制有事务MANDATORY,则抛异常如果不存在事务,但是需要有一个事务,则新创建一个事务,同时使用Object transaction开启事务。

	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {

        // 如果没有给定的TransactionDefinition就使用默认的TransactionDefinition
		TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

        //获取底层的数据库事务,该方法由子类实现
		Object transaction = doGetTransaction();

        //1.判断是否存在一个事务,该方法由子类实现
		if (isExistingTransaction(transaction)) {
            //如果存在一个事务,则检查事务的传播行为以了解行为方式,并创建一个TransactionStatus
			return handleExistingTransaction(def, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
		if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
		}

        //2.没有找到一个已经存在的事务
        //2.1 如果没有找到一个已经存在的事务,而且事务的传播特性是MANDATORY,则抛异常
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
        //2.2 如果没有找到一个已经存在的事务,而且事务的传播特性是REQUIRED,REQUIRES_NEW,NESTED,则创建新的事务
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			SuspendedResourcesHolder suspendedResources = suspend(null);
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //开启一个事务,该方法由子类实现
				doBegin(transaction, def);
				prepareSynchronization(status, def);
				return status;
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
        //创建“空”事务:没有实际事务,但可能同步。
		else {
			if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + def);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
		}
	}
handleExistingTransaction

为已经存在的事务创建一个TransactionStatus,也就是说如果当前已经存在了一个事务,则会调用该方法,根据事务的传播特性创建一个TransactionStatus。

	private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {

        //NEVER总是以非事务执行,不支持当前事务;如果存在事务,则抛出异常。
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}

        //NOT_SUPPORTED总是以非事务执行,不支持当前事务;如果存在事务,则挂起当前事务。
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            //创建一个TransactionStatus,newTransaction=false
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}

        //REQUIRES_NEW总是开启新事务;如果存在事务,则将这个存在的事务挂起
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                //创建一个新事务状态,newTransaction=false
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException | Error beginEx) {
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
		}

        //NESTED嵌套事务,创建一个嵌套事务
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +
						"specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (useSavepointForNestedTransaction()) {
                //创建一个TransactionStatus,newTransaction=false
				DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				status.createAndHoldSavepoint();
				return status;
			}
			else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, null);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
		}

		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
		if (debugEnabled) {
			logger.debug("Participating in existing transaction");
		}
		if (isValidateExistingTransaction()) {
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					Constants isoConstants = DefaultTransactionDefinition.constants;
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] specifies isolation level which is incompatible with existing transaction: " +
							(currentIsolationLevel != null ?
									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
									"(unknown)"));
				}
			}
			if (!definition.isReadOnly()) {
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" +
							definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}
prepareTransactionStatus

根据给定的参数创建一个新的TransactionStatus,同时适当地初始化事务同步

	protected final DefaultTransactionStatus prepareTransactionStatus(
			TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
			boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

        //创建一个DefaultTransactionStatus
		DefaultTransactionStatus status = newTransactionStatus(
				definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
		prepareSynchronization(status, definition);
		return status;
	}
newTransactionStatus

根据给定的参数创建一个TransactionStatus

	protected DefaultTransactionStatus newTransactionStatus(
			TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
			boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

		boolean actualNewSynchronization = newSynchronization &&
				!TransactionSynchronizationManager.isSynchronizationActive();
		return new DefaultTransactionStatus(
				transaction, newTransaction, actualNewSynchronization,
				definition.isReadOnly(), debug, suspendedResources);
	}

prepareSynchronization

适当的初始化事务同步

	protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
		if (status.isNewSynchronization()) {
			TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
					definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
							definition.getIsolationLevel() : null);
			TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
			TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
			TransactionSynchronizationManager.initSynchronization();
		}
	}
rollback(TransactionStatus status)

该方法根据事务的信息执行各种操作,比如回滚事务,或者设置rollbackOnly属性为true

核心方法

doRollback:调用具体的事务管理器进行事务回滚doSetRollbackOnly:将事务标为仅回滚,由外层事务进行回滚

	public final void rollback(TransactionStatus status) throws TransactionException {
        //检查事务状态是否为已结束,如果已经结束则会抛异常
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		processRollback(defStatus, false);
	}
processRollback(DefaultTransactionStatus status, boolean unexpected)
	private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
		try {
			boolean unexpectedRollback = unexpected;

			try {
				triggerBeforeCompletion(status);

                //如果有savepoint,回滚至savepoint,只有一种情况
                //1.内层PROPAGATION_NESTED事务会开启保存点
				if (status.hasSavepoint()) {
					status.rollbackToHeldSavepoint();
				}
                //如果是新事务,则回滚事务,有两种情况
                //1.最外层事务,事务是PROPAGATION_REQUIRED
                //2.外层事务是PROPAGATION_REQUIRED,内层事务是PROPAGATION_REQUIRES_NEW
				else if (status.isNewTransaction()) {
                    //调用具体的事务管理器子类进行实际的事务回滚
					doRollback(status);
				}
                //如果有事务但不是新事务,则把标记事务状态为仅回滚,等到外层事务那一层进行回滚
                //而是当前事务方法参与到了外层事务中或者没有开启事务,比如
                //1.内层PROPAGATION_REQUIRED
                //2.内层PROPAGATION_SUPPORTS
                //3.内层PROPAGATION_MANDATORY,
                //这表示内外层方法处于同一个事务中,因此在内层事务设置rollback-only,最终会在外层进行事务回滚
				else {
					// 当前事务方法参与了一个更大的事务
					if (status.hasTransaction()) {
                        //
						if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            //设置rollbackOnly为true,等到外层事务的那一层进行回滚
							doSetRollbackOnly(status);
						}
					}
					// Unexpected rollback only matters here if we're asked to fail early
					if (!isFailEarlyOnGlobalRollbackOnly()) {
						unexpectedRollback = false;
					}
				}
			}
			catch (RuntimeException | Error ex) {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				throw ex;
			}

			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

			// Raise UnexpectedRollbackException if we had a global rollback-only marker
			if (unexpectedRollback) {
				throw new UnexpectedRollbackException(
						"Transaction rolled back because it has been marked as rollback-only");
			}
		}
        //清空记录的资源并将挂起的资源恢复
		finally {
			cleanupAfterCompletion(status);
		}
	}
commit(TransactionStatus status)
public final void commit(TransactionStatus status) throws TransactionException {
	   //1.如果事务已经完成了,则抛异常	
       if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	   //2.如果事务状态表示事务需要回滚,则对事务进行回滚
       if (defStatus.isLocalRollbackOnly()) {
			processRollback(defStatus, false);
			return;
		}

        //3.
		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
			processRollback(defStatus, true);
			return;
		}
        //4.进行事务提交
		processCommit(defStatus);
	}
processCommit(DefaultTransactionStatus status)

处理实际的事务提交commit

	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;

			try {
				boolean unexpectedRollback = false;
				prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;

				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					status.releaseHeldSavepoint();
				}
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					doCommit(status);
				}
				else if (isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = status.isGlobalRollbackOnly();
				}

				// Throw UnexpectedRollbackException if we have a global rollback-only
				// marker but still didn't get a corresponding exception from commit.
				if (unexpectedRollback) {
					throw new UnexpectedRollbackException(
							"Transaction silently rolled back because it has been marked as rollback-only");
				}
			}
			catch (UnexpectedRollbackException ex) {
				// can only be caused by doCommit
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				throw ex;
			}
			catch (TransactionException ex) {
				// can only be caused by doCommit
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				}
				throw ex;
			}
			catch (RuntimeException | Error ex) {
				if (!beforeCompletionInvoked) {
					triggerBeforeCompletion(status);
				}
				doRollbackOnCommitException(status, ex);
				throw ex;
			}

			// Trigger afterCommit callbacks, with an exception thrown there
			// propagated to callers but the transaction still considered as committed.
			try {
				triggerAfterCommit(status);
			}
			finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}

		}
        //事务完成后进行cleanup
		finally {
			cleanupAfterCompletion(status);
		}
	}
DataSourceTransactionManager
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
		implements ResourceTransactionManager, InitializingBean {
    
	private DataSource dataSource;

	private boolean enforceReadonly = false;
    	public DataSourceTransactionManager() {
		setNestedTransactionAllowed(true);
	}

	public DataSourceTransactionManager(DataSource dataSource) {
		this();
		setDataSource(dataSource);
		afterPropertiesSet();
	}
}
doGetTransaction
	protected Object doGetTransaction() {
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
		ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}
isExistingTransaction
	protected boolean isExistingTransaction(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
	}
doBegin
	protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
			if (!txObject.hasConnectionHolder() ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
				Connection newCon = obtainDataSource().getConnection();
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}

			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();

			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			txObject.setPreviousIsolationLevel(previousIsolationLevel);
			txObject.setReadOnly(definition.isReadOnly());

			//将事务提交设置为手动,非自动
            if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				con.setAutoCommit(false);
			}

			prepareTransactionalConnection(con, definition);
			txObject.getConnectionHolder().setTransactionActive(true);

			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			// Bind the connection holder to the thread.
			if (txObject.isNewConnectionHolder()) {
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, obtainDataSource());
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}

prepareTransactionalConnection

在事务begin的时候,准备一个事务Connection,

	protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
			throws SQLException {
		if (isEnforceReadOnly() && definition.isReadOnly()) {
			try (Statement stmt = con.createStatement()) {
				stmt.executeUpdate("SET TRANSACTION READ ONLY");
			}
		}
	}
doSuspend
	@Override
	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		txObject.setConnectionHolder(null);
		return TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}
doResume
	@Override
	protected void doResume(@Nullable Object transaction, Object suspendedResources) {
		TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
	}
doCommit
	protected void doCommit(DefaultTransactionStatus status) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
		try {
			con.commit();
		}
		catch (SQLException ex) {
			throw new TransactionSystemException("Could not commit JDBC transaction", ex);
		}
	}
doRollback
	protected void doRollback(DefaultTransactionStatus status) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
		try {
			con.rollback();
		}
		catch (SQLException ex) {
			throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
		}
	}
doSetRollbackonly
	protected void doSetRollbackOnly(DefaultTransactionStatus status) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		txObject.setRollbackOnly();
	}
doCleanupAfterCompletion
	@Override
	protected void doCleanupAfterCompletion(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

		// Remove the connection holder from the thread, if exposed.
		if (txObject.isNewConnectionHolder()) {
			TransactionSynchronizationManager.unbindResource(obtainDataSource());
		}

		// Reset connection.
		Connection con = txObject.getConnectionHolder().getConnection();
		try {
			if (txObject.isMustRestoreAutoCommit()) {
				con.setAutoCommit(true);
			}
			DataSourceUtils.resetConnectionAfterTransaction(
					con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
		}
		catch (Throwable ex) {
			logger.debug("Could not reset JDBC Connection after transaction", ex);
		}

		if (txObject.isNewConnectionHolder()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
			}
			DataSourceUtils.releaseConnection(con, this.dataSource);
		}

		txObject.getConnectionHolder().clear();
	}
DataSourceTransactionObject

DataSource事务对象,表示一个ConnectionHolder。

被DataSourceTransactionManager当作一个transaction对象使用

	private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

		private boolean newConnectionHolder;

		private boolean mustRestoreAutoCommit;

		public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
			super.setConnectionHolder(connectionHolder);
			this.newConnectionHolder = newConnectionHolder;
		}

		public boolean isNewConnectionHolder() {
			return this.newConnectionHolder;
		}

		public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {
			this.mustRestoreAutoCommit = mustRestoreAutoCommit;
		}

		public boolean isMustRestoreAutoCommit() {
			return this.mustRestoreAutoCommit;
		}

		public void setRollbackOnly() {
			getConnectionHolder().setRollbackOnly();
		}

		@Override
		public boolean isRollbackOnly() {
			return getConnectionHolder().isRollbackOnly();
		}

		@Override
		public void flush() {
			if (TransactionSynchronizationManager.isSynchronizationActive()) {
				TransactionSynchronizationUtils.triggerFlush();
			}
		}
	}
TransactionStatus
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {

    //返回此事务内部是否带有保存点savepoint,即是否已创建了基于保存点的嵌套事务。
	boolean hasSavepoint();
    
    //将底层session刷新到数据库(如果适用):例如,所有受影响的 Hibernate/JPA session。
	@Override
	void flush();
}
TransactionExecution

事务当前状态通用接口。

public interface TransactionExecution {

    //返回当前是否是否是一个新的事务,否则参与到一个已经存在的事务或者,或者一开始就没有在实际事务内运行
	boolean isNewTransaction();

    //设置事务rollback-only,指示事务管理器唯一的可能结果就是回滚事务
	void setRollbackOnly();

	boolean isRollbackOnly();

    //表示该事务是否结束,即是否已经commit或者roll back
	boolean isCompleted();
}
AbstractTransactionStatus

TransactionStatus抽象类,预先实现rollback-only 和completed标志的处理,以及对底层 {@link org.springframework.transaction.SavepointManager} 的委托。

还提供了在事务中保存保存点的选项。

public abstract class AbstractTransactionStatus implements TransactionStatus {

	private boolean rollbackonly = false;

	private boolean completed = false;

	@Nullable
	private Object savepoint;
    
    @Override
	public void setRollbackOnly() {
		this.rollbackonly = true;
	}
    @Override
	public boolean isRollbackOnly() {
		return (isLocalRollbackOnly() || isGlobalRollbackOnly());
	}
    	public void setCompleted() {
		this.completed = true;
	}
	public boolean isLocalRollbackOnly() {
		return this.rollbackOnly;
	}

	
	public boolean isGlobalRollbackOnly() {
		return false;
	}
    
	@Override
	public boolean isCompleted() {
		return this.completed;
	}
    //---------------------------------------------------------------------
	// 处理当前savepoint的状态
	//---------------------------------------------------------------------
	@Override
	public boolean hasSavepoint() {
		return (this.savepoint != null);
	}

    //为事务设置savepoint,对PROPAGATION_NESTED是有用的
	protected void setSavepoint(@Nullable Object savepoint) {
		this.savepoint = savepoint;
	}

	protected Object getSavepoint() {
		return this.savepoint;
	}

    //创建一个savepoint,并保存到事务中
	public void createAndHoldSavepoint() throws TransactionException {
		setSavepoint(getSavepointManager().createSavepoint());
	}

    //回滚到为事务保留的savepoint,然后立即释放savepoint
	public void rollbackToHeldSavepoint() throws TransactionException {
		Object savepoint = getSavepoint();
		if (savepoint == null) {
			throw new TransactionUsageException(
					"Cannot roll back to savepoint - no savepoint associated with current transaction");
		}
		getSavepointManager().rollbackToSavepoint(savepoint);
		getSavepointManager().releaseSavepoint(savepoint);
		setSavepoint(null);
	}

    //释放savepoint
	public void releaseHeldSavepoint() throws TransactionException {
		Object savepoint = getSavepoint();
		if (savepoint == null) {
			throw new TransactionUsageException(
					"Cannot release savepoint - no savepoint associated with current transaction");
		}
		getSavepointManager().releaseSavepoint(savepoint);
		setSavepoint(null);
	}

	//---------------------------------------------------------------------
	// Implementation of SavepointManager
	//---------------------------------------------------------------------

	
	@Override
	public Object createSavepoint() throws TransactionException {
		return getSavepointManager().createSavepoint();
	}

	
	@Override
	public void rollbackToSavepoint(Object savepoint) throws TransactionException {
		getSavepointManager().rollbackToSavepoint(savepoint);
	}

	
	@Override
	public void releaseSavepoint(Object savepoint) throws TransactionException {
		getSavepointManager().releaseSavepoint(savepoint);
	}

    //为底层的事务返回一个SavepointManager
    //默认是抛异常,子类可以进行覆盖
	protected SavepointManager getSavepointManager() {
		throw new NestedTransactionNotSupportedException("This transaction does not support savepoints");
	}
DefaultTransactionStatus
public class DefaultTransactionStatus extends AbstractTransactionStatus {

	@Nullable
	private final Object transaction;

	private final boolean newTransaction;

	private final boolean newSynchronization;

	private final boolean readOnly;

	private final boolean debug;

	@Nullable
	private final Object suspendedResources;

	public DefaultTransactionStatus(
			@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
			boolean readOnly, boolean debug, @Nullable Object suspendedResources) {

		this.transaction = transaction;
		this.newTransaction = newTransaction;
		this.newSynchronization = newSynchronization;
		this.readonly = readOnly;
		this.debug = debug;
		this.suspendedResources = suspendedResources;
	}

	public Object getTransaction() {
		Assert.state(this.transaction != null, "No transaction active");
		return this.transaction;
	}

	public boolean hasTransaction() {
		return (this.transaction != null);
	}

	@Override
	public boolean isNewTransaction() {
		return (hasTransaction() && this.newTransaction);
	}

	public boolean isNewSynchronization() {
		return this.newSynchronization;
	}

	public boolean isReadOnly() {
		return this.readOnly;
	}

	public boolean isDebug() {
		return this.debug;
	}

	@Nullable
	public Object getSuspendedResources() {
		return this.suspendedResources;
	}


	//---------------------------------------------------------------------
	// Enable functionality through underlying transaction object
	//---------------------------------------------------------------------

	
	@Override
	public boolean isGlobalRollbackOnly() {
		return ((this.transaction instanceof SmartTransactionObject) &&
				((SmartTransactionObject) this.transaction).isRollbackOnly());
	}

	@Override
	protected SavepointManager getSavepointManager() {
		Object transaction = this.transaction;
		if (!(transaction instanceof SavepointManager)) {
			throw new NestedTransactionNotSupportedException(
					"Transaction object [" + this.transaction + "] does not support savepoints");
		}
		return (SavepointManager) transaction;
	}

	public boolean isTransactionSavepointManager() {
		return (this.transaction instanceof SavepointManager);
	}

	@Override
	public void flush() {
		if (this.transaction instanceof SmartTransactionObject) {
			((SmartTransactionObject) this.transaction).flush();
		}
	}
}
扩展学习

Spring 的注解方式的事务实现机制Spring的BeanPostProcessor扩展点实现类AbstractAutoProxyCreatorSpring AOP创建代理类之JdkDynamicAopProxySpring 的注解方式的事务实现机制Spring的TransactionDefinition中定义的事务的传播特性和隔离级别详解Spring的事务管理PlatformTransactionManager

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/770408.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号