栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

seata源码解析:一个GlobalTransaction注解是如何开启分布式事务的?

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

seata源码解析:一个GlobalTransaction注解是如何开启分布式事务的?

介绍

我们都知道seata中有三种角色,TM,RM,TC。那么如何区分一个应用是RM还是TM呢?这个问题估计你很快就能解答出来,全局事务的发起者是TM,全局事务的执行者是RM。

如何从代码中确定全局事务的发起者呢?只要这个方法加了@GlobalTransactional就是全局事务的发起者,不管你使用的哪种模式。

本篇文章我们就来分析一下为什么一个@GlobalTransactional注解就能开启分布式事务

给加了@GlobalTransactional,@GlobalLock,@TwoPhaseBusinessAction注解的方法生成代理类

// GlobalTransactionScanner
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    try {
        synchronized (PROXYED_SET) {
            // 已经处理过就不在处理
            if (PROXYED_SET.contains(beanName)) {
                return bean;
            }
            interceptor = null;
            //check TCC proxy
            // 是否是tcc模式的代理
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                    (ConfigurationChangeListener)interceptor);
            } else {
                Class serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // 目标类或者目标接口上不存在 @GlobalTransactional 或者 @GlobalLock
                if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                    return bean;
                }

                if (globalTransactionalInterceptor == null) {
                    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationCache.addConfigListener(
                        ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)globalTransactionalInterceptor);
                }
                interceptor = globalTransactionalInterceptor;
            }

            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            // 如果是普通的bean,走父类的方法生成代理类即可
            if (!AopUtils.isAopProxy(bean)) {
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
                // 如果已经是代理类,获取到advisor后,添加到该集合即可
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                // 根据上面的interceptor生成advisor
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                for (Advisor avr : advisor) {
                    advised.addAdvisor(0, avr);
                }
            }
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

当方法上有@GlobalTransactional注解或者@GlobalLock注解时,会执行到GlobalTransactionalInterceptor#invoke方法

// GlobalTransactionalInterceptor
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class targetClass =
        methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    // 执行代理逻辑
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(method, targetClass, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) {
            // 如果有@GlobalTransactional注解
            if (globalTransactionalAnnotation != null) {
                return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
            } else if (globalLockAnnotation != null) {
                // 如果有@GlobalLock注解,用于非at模式的数据库加锁
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    return methodInvocation.proceed();
}
@GlobalTransaction增强逻辑

执行TransactionalTemplate#handleGlobalTransaction方法,传入的参数为TransactionalExecutor的实现类,可以看到这个是一个匿名实现类

// TransactionalTemplate
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
    final GlobalTransactional globalTrxAnno) throws Throwable {
    boolean succeed = true;
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
            	// 执行被代理的方法
                return methodInvocation.proceed();
            }

            public String name() {
                String name = globalTrxAnno.name();
                if (!StringUtils.isNullOrEmpty(name)) {
                    return name;
                }
                return formatMethod(methodInvocation.getMethod());
            }

            
            @Override
            public TransactionInfo getTransactionInfo() {
                // reset the value of timeout
                int timeout = globalTrxAnno.timeoutMills();
                if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                    timeout = defaultGlobalTransactionTimeout;
                }

                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(timeout);
                transactionInfo.setName(name());
                transactionInfo.setPropagation(globalTrxAnno.propagation());
                transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
                Set rollbackRules = new linkedHashSet<>();
                for (Class rbRule : globalTrxAnno.rollbackFor()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (Class rbRule : globalTrxAnno.noRollbackFor()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                transactionInfo.setRollbackRules(rollbackRules);
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        TransactionalExecutor.Code code = e.getCode();
        switch (code) {
            case RollbackDone:
                throw e.getOriginalException();
            case BeginFailure:
                succeed = false;
                failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case CommitFailure:
                succeed = false;
                failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackFailure:
                failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                throw e.getOriginalException();
            case RollbackRetrying:
                failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                throw e.getOriginalException();
            default:
                throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
        }
    } finally {
        if (degradeCheck) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        }
    }
}
// TransactionalTemplate
public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    // 获取当前全局事务,如果没有则创建
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 获取事务传播行为
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        if (tx == null) {
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 开始事务
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 提交事务
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}
  1. 首先获取TransactionInfo,里面定义了事务的各种属性。
  2. 接着获取当前全局事务,如果没有则创建
  3. 然后是对事务传播行为的支持,可以看到和Spring中定义的事务传播行为差不多
  4. 接着就可以看到事务提交的模版方法,开始事务,执行业务逻辑,提交事务,或者回滚事务
@GlobalLock增强逻辑 参考博客
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/298782.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号