- 一、分布式事务是什么?
- 二、TC和TM
- 三、TC
- 3.1读TC的开始
- 3.2 TC的结构
- 3.3 一切的开始源于Aspect包
- 1)代理初始化DataSource连接池;
- 2)通过对扫描注解@LcnTransaction对所有service服务,实现代理;
- 3)对TM的监听
- 4)初始化连接TM
- 5)基于DDD优化后的切面设计
- 6)动态的执行步骤
- 四、LCN机制的核心
- 五、被代理的连接们
- 六、并不完美的数据最终一致性
- 总结
目前主流的分布式事务解决方案有很多种,主流的主要有:LCN、TCC、TXC三种模式,本文主要讲解LCN的原理。
想看TX-LCN官方文档请传送
[tx-lcn官方文档](https://www.codingapi.com/docs/txlcn-preface/) 一、分布式事务是什么?
分布式事务是基于服务微服务化之后引申出的事务问题;
在单体架构的服务中,通过本地事务一次性解决对数据库数据操作,通过本地事务实现数据的一致性,如下图左侧所示。
在微服务场景下就有可能出现A调用B服务的过程中,B服务成功了,A服务失败了,导致了最终AB数据不一致的情况,如下图右侧所示。
那么要解决分布式事务的问题,其实就是要考虑怎么实现数据的一致性,核心就是CAP定律 base理论,根据这个方向引申出了以下几种分布式事务解决方案:
1、TCC
TCC即Try、/confirm/i,Cancle,从翻译的字面意思理解其实就是这个解决方案的原理,对数据的提交和失败都做相应的业务补偿。
一般来说TCC都是指的一种解决方案,如果使用TX-LCN框架,更多指的是对/confirm/i的提交确认和Cancle的补偿机制,对补偿的业务代码逻辑性要求很高,这类方案其实还有很多需要升级的地方,后续有机会会补充讲解。
该模式对代码的嵌入性高,要求每个业务需要写三种步骤的操作。
该模式对有无本地事务控制都可以支持使用面广。
数据一致性控制几乎完全由开发者控制,对业务开发难度要求高。
2、LCN
LCN其实是对2PC模式的一种实现,即通过事务协调者来控制对业务事务的两段提交,和TXC其实算是一类模式,由TC和TM两个组件组成,TC负责客户端的事务代理和TM协作,TM负责管控事务的生命周期,以及记录异常日志。
该模式对代码的嵌入性为低。
该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。
该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。
该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。
3、TXC
TXC的实现原理是在执行SQL之前,先解析SQL,查询影响数据,然后保存执行的SQL快走信息和创建锁。当需要回滚的时候就采用这些记录数据回滚数据库,目前锁实现依赖redis分布式锁控制。
该模式与LCN模式类似同样对代码的嵌入性低。
该模式仅限于对支持SQL方式的模块支持。
该模式由于每次执行SQL之前需要先查询影响数据,因此相比LCN模式消耗资源与时间要多。
该模式不会占用数据库的连接资源。
LCN模式的设计思路是由TM和TC组成的,其中图示的LCN代理连接池就是TC对客户端的代理,我们需要对业务工程的本地连接池代理,即需要对业务代码做控制,该控制的实现在Tx-LCN的设计方案里叫TC。
三、TC 3.1读TC的开始读一个框架,首先要看的是官方文档,通过文档我们可以了解到Tx-Lcn框架整体是对Spring进行了天生依赖,版本是SpringBoot2.x,在后续实现的解释中,我们就可以通过Springboot的官方文档参照研究TC模块的设计模式。
TC模块在初始化过程中使用了很多@Configuration和@ConfigurationProperties的注解对配置进行管理。
例如:
1. 初始化Tx相关的信息
从这里其实可以通过解读该框架理解其他框架也是可以通过类似的方式,读取需要的用户配置。
2. 初始化对连接池和本地事务的代理:
3.2 TC的结构 3.3 一切的开始源于Aspect包 1)代理初始化DataSource连接池; 2)通过对扫描注解@LcnTransaction对所有service服务,实现代理;package com.codingapi.txlcn.tc.aspect;
import com.codingapi.txlcn.p6spy.CompoundJdbcEventListener;
import com.codingapi.txlcn.tc.config.TxConfig;
import com.codingapi.txlcn.tc.control.TransactionContext;
import com.codingapi.txlcn.tc.jdbc.JdbcTransactionDataSource;
import com.codingapi.txlcn.tc.resolver.AnnotationContext;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJexpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//初始化切点
@Configuration
public class AspectConfiguration {
//这里需要注意ConditionalOnMissingBean注解,该方法是用来构建唯一的事务切点上下文管理,为了保证事务的隔离性。
@Bean
@ConditionalOnMissingBean
public TransactionAspectContext transactionAspectManager(TransactionContext transactionContext,
AnnotationContext annotationContext) {
return new TransactionAspectContext(transactionContext, annotationContext);
}
//1、该方法具体实现了对所有带有@LcnTransaction注解的方法切点,即对本地事务代理的初始化过程。
//2、对@LcnTransaction注解的扫描方式涉及到一个@annotation的切面增强的用法。
//3、依赖注入了txTransactionInterceptor来实现具体的拦截方法
@Bean
public Advisor txTransactionAdvisor(TxTransactionInterceptor txTransactionInterceptor, TxConfig txConfig){
AspectJexpressionPointcut pointcut=new AspectJexpressionPointcut();
pointcut.setexpression(txConfig.getTransactionPointcut());
return new DefaultPointcutAdvisor(pointcut, txTransactionInterceptor);
}
//1、同样需要注意的是@ConditionalOnMissingBean注解,该方法是用来构建唯一的事务切点上下文管理,为了保证事务的隔离性。
//2、数据源切点实现很简单,直接通过TxConfig默认注入的execution(* javax.sql.DataSource.getConnection(..))包来获取切点
//3、依赖注入了txDataSourceInterceptor来实现具体的拦截方法
@Bean
public Advisor txDataSourceAdvisor(TxDataSourceInterceptor txDataSourceInterceptor, TxConfig txConfig){
AspectJexpressionPointcut pointcut=new AspectJexpressionPointcut();
pointcut.setexpression(txConfig.getDatasourcePointcut());
return new DefaultPointcutAdvisor(pointcut, txDataSourceInterceptor);
}
//TxTransaction具体的拦截器
@Bean
public TxTransactionInterceptor txTransactionInterceptor(TransactionAspectContext transactionAspectContext){
return new TxTransactionInterceptor(transactionAspectContext);
}
//DataSource具体的拦截器
@Bean
public TxDataSourceInterceptor txDataSourceInterceptor(CompoundJdbcEventListener compoundJdbcEventListener, JdbcTransactionDataSource jdbcTransactionDataSource){
return new TxDataSourceInterceptor(compoundJdbcEventListener,jdbcTransactionDataSource);
}
}
3)对TM的监听
TC通过初始化一个ScheduledThreadPoolExecutor的定时任务线程池,实现对TC节点的监听。
public TmServerRunner(TxConfig txConfig, ProtocolServer protocolServer,
SnowflakeStep snowFlakeStep, TxManagerReporter txManagerReporter) {
//1、tx的配置,具体包含了数据库类型、协议类型、TM节点信息等
this.txConfig = txConfig;
//2、使用的通讯服务初始化
this.protocolServer = protocolServer;
//3、初始化全局groupid生成器
this.snowFlakeStep = snowFlakeStep;
//4、与tm通讯的具体实现类
this.reporter = txManagerReporter;
//5、初始化监听tm的线程池
//6、需要注意的是这里用的是默认ScheduledThreadPoolExecutor,最大线程数是Integer.MAX_VALUE,建议根据实际场景切换自定义最大线程数处理。
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setNameFormat("tmServerRunner-pool-%d").build());
}
**
4)初始化连接TM**
public void init() {
try {
//1、通过异步的方式,tc在对tm不断获取连接并刷新本地唯一的groupid值
CompletableFuture futureTonotify = new CompletableFuture<>();
scheduledExecutorService.scheduleAtFixedRate(() -> {
List iNetSocketAddresses = ListUtil.isEmpty(txConfig.getINetSocketAddresses()) ?
txConfig.txManagerAddresses() : txConfig.getINetSocketAddresses();
iNetSocketAddresses.forEach(address -> {
protocolServer.connectTo(address.getHostString(), address.getPort(), futureToNotify);
futureToNotify.whenCompleteAsync((s, throwable) -> {
log.debug("=> futureToNotify.whenCompleteAsync");
//2、不断获取全局的唯一的GroupId并缓存
snowFlakeStep.getGroupIdAndLogId();
//3、尝试加入更多TM节点。
this.tryToGetMoreTmResource(iNetSocketAddresses);
});
});
}, 0, 30, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
5)基于DDD优化后的切面设计
package com.codingapi.txlcn.tc.aspect;
import com.codingapi.txlcn.tc.control.TransactionContext;
import com.codingapi.txlcn.tc.control.TransactionState;
import com.codingapi.txlcn.tc.control.TransactionStateStrategy;
import com.codingapi.txlcn.tc.info.TransactionInfo;
import com.codingapi.txlcn.tc.resolver.AnnotationContext;
import com.codingapi.txlcn.tc.resolver.TxAnnotation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInvocation;
import java.lang.reflect.Method;
// 这个类是对切点的具体实现
// 6.0开始作者对txlcn做了设计上的优化,抽象lcn、tcc、txc模式为动态对象。
@Slf4j
@AllArgsConstructor
public class TransactionAspectContext {
//事务管理器的上下文
//负责管理整个事务生命周期
private TransactionContext transactionContext;
//注解读取器的上下文
//在初始化过程中已经对添加@LcnTransaction注解的service实现了缓存
private AnnotationContext annotationContext;
public Object runWithTransaction(MethodInvocation invocation) throws Throwable {
Method targetMethod = invocation.getMethod();
// 1、读取被切点是否为LCN
TxAnnotation txAnnotation = annotationContext.getAnnotation(targetMethod);
if(txAnnotation==null){
return invocation.proceed();
}
// 2、事务状态判定
TransactionState transactionState = TransactionStateStrategy.getTransactionState();
TransactionInfo transactionInfo = TransactionInfo.current();
// 3、当transactionInfo == null表明是开始分布式事务
if(transactionInfo==null){
transactionInfo = new TransactionInfo(transactionState);
}
// 4、统一设置事务类型
transactionInfo.setTransactionType(txAnnotation.getType());
log.debug("run with tx-lcn start...");
Object res = null;
try {
// 5、尝试开始代理事务
transactionContext.tryBeginTransaction(transactionInfo);
res = invocation.proceed();
transactionInfo.setSuccessReturn(true);
}catch (Exception e){
transactionInfo.setSuccessReturn(false);
throw e;
}finally {
// 6、尝试结束代理事务
transactionContext.tryEndTransaction(transactionInfo);
// 7、尝试结束代理事务
transactionContext.clearTransaction();
}
log.debug("run with tx-lcn over.");
return res;
}
}
6)动态的执行步骤
package com.codingapi.txlcn.tc.control;
import com.codingapi.txlcn.tc.info.TransactionInfo;
import java.util.List;
import java.util.Optional;
public class TransactionStepContext {
private List transactionSteps;
public TransactionStepContext(List transactionSteps) {
this.transactionSteps = transactionSteps;
}
private Optional transactionStep(TransactionState type) {
for (TransactionStep transactionStep : transactionSteps) {
if (transactionStep.type().equals(type)) {
// 构建具体的transactionStep对象 例如TransactionStepCreate
return Optional.of(transactionStep);
}
}
return Optional.empty();
}
public void execute(TransactionInfo transactionInfo){
// 在具体执行事务的操作步骤时通过Optional的ifPresent方法通过lambda表达式动态执行步骤
Optional transactionStep = transactionStep(transactionInfo.getTransactionState());
// 翻译后为 TransactionStepCreate.run(transactionInfo)
transactionStep.ifPresent(step->step.run(transactionInfo));
}
}
四、LCN机制的核心
五、被代理的连接们
六、并不完美的数据最终一致性
总结



