- 分布式事务
- CAP理论
- XA方案
- TCC方案
- 可靠消息最终一致性方案
- 成熟开源的分布式事务框架
- MQ实现分布式事务Demo
分布式事务主要有以下五种解决方案
- XA方案,也叫两段式提交方案
- TCC方案,也叫三段式提交方案
- 可靠消息最终一致性方案
CAP理论中,C(Consistency)是强一致性,A(Availability)是可用性,P(Partition Tolerance)分区容错行。该理论在实际的分布式系统环境中,只能满足其中的两项,而不能同时满足这三个属性。
- 一致性:所有节点上的数据时刻保持同步
- 可用性:每个请求都应该受到相应,无论是成功还是失败
- 分区容错性:系统应该持续的提供服务,即使分区内有数据丢失
2PC其实就是保留一致性和分区容错性,而不考虑可用性。
在介绍2PC前,咱们应该了解一下XA规范。XA规范描述了全局事务管理与局部资源管理器之间的接口。XA规范的目的就是允许多个资源在同一个事务中访问,这样可以使得ACID特性实现跨多个应用而保持有效。
XA使用两阶段提交(2PC)来保证所有资源同时提交或者同时回滚。
正常情况下2PC提交流程:
异常情况下2PC提交流程
以上的2PC两阶段提交总结如下
- 第一阶段(提交请求阶段)
- 全局事务管理器向所有资源发送提交请求,询问是否可以执行提价操作,并等待所有资源的响应
- 各个资源节点执行节点执行事务管理器的事务询问请求,并将Undo信息和Redo信息写入日志中
- 各个资源节点响应全局事务管理器的询问请求。如果资源节点成功的执行了事务操作,那么就响应给全局事务管理器为"同意",如果资源节点执行事务操作失败,则响应给全局事务管理器为"中止"
- 第二阶段(提价执行阶段成功)
- 全局事务管理器开始向所有资源节点发送提交执行请求
- 所有资源节点收到全局事务管理器的提交请求后,资源节点开始执行事务提交
- 所有资源节点执行事务提交完成后,向全局事务管理器响应已完成消息
- 全局事务管理器收到所有资源节点的完成消息后,完成事务
- 第二阶段(提交执行阶段失败)
- 全局事务管理器向所有所有资源节点发出”回滚操作”的请求。
- 所有资源节点利用之前写入的Undo信息执行回滚,并释放在整个事务期间内占用的资源。
- 所有资源节点向全局事务管理器发送”回滚完成”消息。
- 全局事务管理器收到所有所有资源节点反馈的”回滚完成”消息后,取消事务。
有时候,第二阶段也被称作完成阶段,因为无论结果怎样,协调者都必须在此阶段结束当前事务。
TCC方案TCC方案可以分为三个阶段,分别为Try、/confirm/i、Cancel。
- Try阶段:这个阶段对各个服务的资源做检测以及对资源进行锁定后者预留
- /confirm/i阶段:这个阶段对各个服务执行实际的操作
- Cancel阶段:如果任何一个服务实际执行失败,就需要进行补偿,也就是对实际执行了真正操作的服务进行回滚。
以下以银行转账为例子描述TCC方案的大概流程:
- Try阶段:首先在设计表的时候,在账户表中多设计两个列。一个是预加字段,冻结字段。当指定Try阶段的时候,A给B转账1块钱,不直接进行账号的加减操作,而是首先在A的冻结字段上插入1,在B的预加字段上也插入1。以上的就是Try阶段的锁定资源或者预留资源。
- /confirm/i阶段:就是实际在A和B账户上进行实际的加减操作。A账户的账户余额字段减去冻结字段的值,B账户的余额字段加上预加字段的值。
- Cancel阶段:如果/confirm/i阶段任何一方执行失败了,则进行全部的回滚。就是比如A银行账户如果已经扣减了,但是B银行账户资金增加失败了,那么就得把A银行账户资金给加回去。
这个的意思,就是干脆不要用本地的消息表了,直接基于 MQ 来实现事务。比如阿里的 RocketMQ 或则RabbitMQ就支持消息事务。
大概的意思就是:
- A 系统先发送一个 prepared 消息到 mq,如果这个 prepared 消息发送失败那么就直接取消操作别执行了;
- 如果这个消息发送成功过了,那么接着执行本地事务,如果成功就告诉 mq 发送确认消息,如果失败就告诉 mq 回滚消息;
- 如果发送了确认消息,那么此时 B 系统会接收到确认消息,然后执行本地的事务;
- mq 会自动定时轮询所有 prepared 消息回调你的接口,问你,这个消息是不是本地事务处理失败了,所有没发送确认的消息,是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。
- 这个方案里,要是系统 B 的事务失败了咋办?重试咯,自动不断重试直到成功,如果实在是不行,要么就是针对重要的资金类业务进行回滚,比如 B 系统本地回滚后,想办法通知系统 A 也回滚;或者是发送报警由人工来手工回滚和补偿。
- 这个还是比较合适的,目前国内互联网公司大都是这么玩儿的,要不你举用 RocketMQ 支持的,要不你就自己基于类似 ActiveMQ?RabbitMQ?自己封装一套类似的逻辑出来,总之思路就是这样子的。
- ByteTCC
- TCC-transaction
- EasyTransaction
- Seata(比较火爆)
- 基于数据库XA/ JTA协议的方式:需要数据库厂商支持; JAVA组件有atomikos等
- 异步校对数据的方式:支付宝、微信支付主动查询支付状态、对账单的形式;
- 基于可靠消息(MQ)的解决方案:异步场景;通用性较强;拓展性较高
- TCC编程式解决方案:严选、阿里、蚂蚁金服自己封装的DTX
- MQ配置
import com.xjw.config.constant.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OrderRabbitMqConfig {
@Bean
public Queue orderQueue() {
return new Queue(RabbitmqConstant.ORDER_QUEUE, true);
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(RabbitmqConstant.ORDER_EXCHANGE, true, false);
}
@Bean
public Binding bind() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(RabbitmqConstant.ORDER_ROUTING_KEY);
}
}
- 实体类
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.UUID;
@Getter
@Setter
public class MessageRecord {
private Long id;
private String businessId;
private int businessType;
private String messageId;
private int retriesNumber;
private int status;
private Date createTime;
public MessageRecord() {
}
public MessageRecord(String businessId, int businessType) {
this.businessId = businessId;
this.businessType = businessType;
this.messageId = UUID.randomUUID().toString().replace("-", "").toLowerCase();
this.retriesNumber = 0;
this.createTime = new Date();
this.status = 0;
}
}
- 业务实现
import java.math.BigDecimal;
@Getter
@Setter
public class Order extends SerializableDto {
private String orderId;
private BigDecimal amount;
private String productName;
}
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.entity.pojo.Order;
import com.xjw.service.MessageRecordService;
import com.xjw.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
public MessageRecordService messageRecordService;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean start(Order order) {
//触发保存本地消息表
MessageRecord messageRecord = new MessageRecord(order.getOrderId(), 1);
messageRecordService.preCommit(messageRecord);
log.info("这里可以做本地业务操作");
log.info("下单中,请稍等-----");
log.info("恭喜您,下单成功,订单号:{}", order.getOrderId());
// 操作本地事务成功则commit 消息,如果处理本地事务异常,则会有定时任务回调
messageRecordService.commit(messageRecord.getMessageId(), true);
return true;
}
}
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.mapper.MessageRecordMapper;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class MessageRecordServiceImpl implements MessageRecordService {
@Autowired
public MessageRecordMapper messageRecordMapper;
@Autowired
public RabbitmqService rabbitmqService;
@Override
public boolean preCommit(MessageRecord messageRecord) {
return messageRecordMapper.insert(messageRecord);
}
@Override
public boolean commit(String messageId, boolean commitFlag) {
if (!commitFlag) {
messageRecordMapper.delete(messageId);
return true;
}
// 提交消息到MQ
MessageRecord messageRecord = messageRecordMapper.find(messageId);
rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));
return true;
}
@Override
public void update(String messageId) {
messageRecordMapper.update(messageId);
}
@Override
public MessageRecord find(String messageId) {
return messageRecordMapper.find(messageId);
}
@Override
public List findAll(int status) {
return messageRecordMapper.findAll(status);
}
}
import com.xjw.callback.RabbitMq/confirm/iCallback;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitmqServiceImpl implements RabbitmqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitMq/confirm/iCallback rabbitMq/confirm/iCallback;
@Override
public void sendMessage(String exchange, String routingKey, String messages, CorrelationData correlationData) {
rabbitTemplate.set/confirm/iCallback(rabbitMq/confirm/iCallback);
rabbitTemplate.convertAndSend(exchange, routingKey, messages, correlationData);
}
}
- 接口管理
import com.xjw.entity.pojo.Order;
import com.xjw.entity.vo.R;
import com.xjw.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.math.BigDecimal;
import java.util.UUID;
@RestController
@RequestMapping("/order")
@Validated
public class OrderController {
@Autowired
public OrderService orderService;
@PostMapping("/start")
public R page(@RequestBody String productName) {
Order order = new Order();
order.setAmount(BigDecimal.valueOf(5000));
order.setProductName(productName);
order.setOrderId(UUID.randomUUID().toString().replace("-", "").toLowerCase());
orderService.start(order);
return R.success();
}
}
- mq/本地消息回调
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMq/confirm/iCallback implements RabbitTemplate./confirm/iCallback {
@Autowired
private MessageRecordService messageRecordService;
@Autowired
public RabbitmqService rabbitmqService;
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String messageId = correlationData.getId();
// 未发送成功
if (!ack) {
MessageRecord messageRecord = messageRecordService.find(messageId);
if (null != messageRecord) {
// 重发
rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));
}
} else {
// 修改消息状态为成功
messageRecordService.update(messageId);
}
}
}
@Component
public class OrderMessageRecord/confirm/i implements MessageRecordCallback {
@Override
public boolean /confirm/i(MessageRecord messageRecord) {
String messageId = messageRecord.getMessageId();
if ("1212321".equals(messageId)) {
return true;
}
return false;
}
}
- 定时任务
import com.xjw.callback.MessageRecordCallback;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@EnableScheduling
public class MessageRecord/confirm/iTask {
@Autowired
public MessageRecordService messageRecordService;
@Autowired
public MessageRecordCallback messageRecordCallback;
@Scheduled(cron = "0 0/5 * * * ?")
public void /confirm/i() {
// 查询所有状态等于0(未提交的状态)
List all = messageRecordService.findAll(0);
if (null != all && all.size() > 0) {
all.forEach(messageRecord -> {
boolean confirm = messageRecordCallback./confirm/i(messageRecord);
// 根据回调结果执行提交或者回滚
messageRecordService.commit(messageRecord.getMessageId(), /confirm/i);
});
}
}
}



