栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

分布式(一)分布式事务解决方案

分布式(一)分布式事务解决方案

文章目录
  • 分布式事务
    • CAP理论
    • XA方案
    • TCC方案
    • 可靠消息最终一致性方案
    • 成熟开源的分布式事务框架
    • MQ实现分布式事务Demo

分布式事务

分布式事务主要有以下五种解决方案

  • XA方案,也叫两段式提交方案
  • TCC方案,也叫三段式提交方案
  • 可靠消息最终一致性方案
CAP理论

CAP理论中,C(Consistency)是强一致性,A(Availability)是可用性,P(Partition Tolerance)分区容错行。该理论在实际的分布式系统环境中,只能满足其中的两项,而不能同时满足这三个属性。

  • 一致性:所有节点上的数据时刻保持同步
  • 可用性:每个请求都应该受到相应,无论是成功还是失败
  • 分区容错性:系统应该持续的提供服务,即使分区内有数据丢失
XA方案

2PC其实就是保留一致性和分区容错性,而不考虑可用性。

在介绍2PC前,咱们应该了解一下XA规范。XA规范描述了全局事务管理与局部资源管理器之间的接口。XA规范的目的就是允许多个资源在同一个事务中访问,这样可以使得ACID特性实现跨多个应用而保持有效。

XA使用两阶段提交(2PC)来保证所有资源同时提交或者同时回滚。

正常情况下2PC提交流程:

异常情况下2PC提交流程

以上的2PC两阶段提交总结如下

  • 第一阶段(提交请求阶段)
    1. 全局事务管理器向所有资源发送提交请求,询问是否可以执行提价操作,并等待所有资源的响应
    2. 各个资源节点执行节点执行事务管理器的事务询问请求,并将Undo信息和Redo信息写入日志中
    3. 各个资源节点响应全局事务管理器的询问请求。如果资源节点成功的执行了事务操作,那么就响应给全局事务管理器为"同意",如果资源节点执行事务操作失败,则响应给全局事务管理器为"中止"
  • 第二阶段(提价执行阶段成功)
    1. 全局事务管理器开始向所有资源节点发送提交执行请求
    2. 所有资源节点收到全局事务管理器的提交请求后,资源节点开始执行事务提交
    3. 所有资源节点执行事务提交完成后,向全局事务管理器响应已完成消息
    4. 全局事务管理器收到所有资源节点的完成消息后,完成事务
  • 第二阶段(提交执行阶段失败)
    1. 全局事务管理器向所有所有资源节点发出”回滚操作”的请求。
    2. 所有资源节点利用之前写入的Undo信息执行回滚,并释放在整个事务期间内占用的资源。
    3. 所有资源节点向全局事务管理器发送”回滚完成”消息。
    4. 全局事务管理器收到所有所有资源节点反馈的”回滚完成”消息后,取消事务。

有时候,第二阶段也被称作完成阶段,因为无论结果怎样,协调者都必须在此阶段结束当前事务。

TCC方案

TCC方案可以分为三个阶段,分别为Try、/confirm/i、Cancel。

  1. Try阶段:这个阶段对各个服务的资源做检测以及对资源进行锁定后者预留
  2. /confirm/i阶段:这个阶段对各个服务执行实际的操作
  3. Cancel阶段:如果任何一个服务实际执行失败,就需要进行补偿,也就是对实际执行了真正操作的服务进行回滚。

以下以银行转账为例子描述TCC方案的大概流程:

  1. Try阶段:首先在设计表的时候,在账户表中多设计两个列。一个是预加字段,冻结字段。当指定Try阶段的时候,A给B转账1块钱,不直接进行账号的加减操作,而是首先在A的冻结字段上插入1,在B的预加字段上也插入1。以上的就是Try阶段的锁定资源或者预留资源。
  2. /confirm/i阶段:就是实际在A和B账户上进行实际的加减操作。A账户的账户余额字段减去冻结字段的值,B账户的余额字段加上预加字段的值。
  3. Cancel阶段:如果/confirm/i阶段任何一方执行失败了,则进行全部的回滚。就是比如A银行账户如果已经扣减了,但是B银行账户资金增加失败了,那么就得把A银行账户资金给加回去。
可靠消息最终一致性方案

这个的意思,就是干脆不要用本地的消息表了,直接基于 MQ 来实现事务。比如阿里的 RocketMQ 或则RabbitMQ就支持消息事务。

大概的意思就是:

  1. A 系统先发送一个 prepared 消息到 mq,如果这个 prepared 消息发送失败那么就直接取消操作别执行了;
  2. 如果这个消息发送成功过了,那么接着执行本地事务,如果成功就告诉 mq 发送确认消息,如果失败就告诉 mq 回滚消息;
  3. 如果发送了确认消息,那么此时 B 系统会接收到确认消息,然后执行本地的事务;
  4. mq 会自动定时轮询所有 prepared 消息回调你的接口,问你,这个消息是不是本地事务处理失败了,所有没发送确认的消息,是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。
  5. 这个方案里,要是系统 B 的事务失败了咋办?重试咯,自动不断重试直到成功,如果实在是不行,要么就是针对重要的资金类业务进行回滚,比如 B 系统本地回滚后,想办法通知系统 A 也回滚;或者是发送报警由人工来手工回滚和补偿。
  6. 这个还是比较合适的,目前国内互联网公司大都是这么玩儿的,要不你举用 RocketMQ 支持的,要不你就自己基于类似 ActiveMQ?RabbitMQ?自己封装一套类似的逻辑出来,总之思路就是这样子的。
成熟开源的分布式事务框架
  • ByteTCC
  • TCC-transaction
  • EasyTransaction
  • Seata(比较火爆)
  • 基于数据库XA/ JTA协议的方式:需要数据库厂商支持; JAVA组件有atomikos等
  • 异步校对数据的方式:支付宝、微信支付主动查询支付状态、对账单的形式;
  • 基于可靠消息(MQ)的解决方案:异步场景;通用性较强;拓展性较高
  • TCC编程式解决方案:严选、阿里、蚂蚁金服自己封装的DTX
MQ实现分布式事务Demo
  • MQ配置
import com.xjw.config.constant.RabbitmqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class OrderRabbitMqConfig {

    
    @Bean
    public Queue orderQueue() {
        return new Queue(RabbitmqConstant.ORDER_QUEUE, true);
    }

    
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(RabbitmqConstant.ORDER_EXCHANGE, true, false);
    }

    
    @Bean
    public Binding bind() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(RabbitmqConstant.ORDER_ROUTING_KEY);
    }
}

  • 实体类
import lombok.Getter;
import lombok.Setter;

import java.util.Date;
import java.util.UUID;


@Getter
@Setter
public class MessageRecord {

    
    private Long id;

    
    private String businessId;
    
    private int businessType;
    
    private String messageId;
    
    private int retriesNumber;
    
    private int status;
    
    private Date createTime;

    public MessageRecord() {
    }

    public MessageRecord(String businessId, int businessType) {
        this.businessId = businessId;
        this.businessType = businessType;
        this.messageId = UUID.randomUUID().toString().replace("-", "").toLowerCase();
        this.retriesNumber = 0;
        this.createTime = new Date();
        this.status = 0;
    }
}

  • 业务实现
import java.math.BigDecimal;


@Getter
@Setter
public class Order extends SerializableDto {

    
    private String orderId;

    
    private BigDecimal amount;

    
    private String productName;
}

import com.xjw.entity.pojo.MessageRecord;
import com.xjw.entity.pojo.Order;
import com.xjw.service.MessageRecordService;
import com.xjw.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;



@Service
@Slf4j
public class OrderServiceImpl implements OrderService {


    @Autowired
    public MessageRecordService messageRecordService;

    
    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean start(Order order) {
        //触发保存本地消息表
        MessageRecord messageRecord = new MessageRecord(order.getOrderId(), 1);
        messageRecordService.preCommit(messageRecord);
        log.info("这里可以做本地业务操作");
        log.info("下单中,请稍等-----");
        log.info("恭喜您,下单成功,订单号:{}", order.getOrderId());
        // 操作本地事务成功则commit 消息,如果处理本地事务异常,则会有定时任务回调
        messageRecordService.commit(messageRecord.getMessageId(), true);
        return true;
    }
}

import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.mapper.MessageRecordMapper;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;


@Service
public class MessageRecordServiceImpl implements MessageRecordService {

    @Autowired
    public MessageRecordMapper messageRecordMapper;

    @Autowired
    public RabbitmqService rabbitmqService;

    @Override
    public boolean preCommit(MessageRecord messageRecord) {
        return messageRecordMapper.insert(messageRecord);
    }

    @Override
    public boolean commit(String messageId, boolean commitFlag) {
        
        if (!commitFlag) {
            messageRecordMapper.delete(messageId);
            return true;
        }
        // 提交消息到MQ
        MessageRecord messageRecord = messageRecordMapper.find(messageId);
        
        rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));
        return true;
    }

    @Override
    public void update(String messageId) {
        messageRecordMapper.update(messageId);
    }

    @Override
    public MessageRecord find(String messageId) {
        return messageRecordMapper.find(messageId);
    }

    @Override
    public List findAll(int status) {
        return messageRecordMapper.findAll(status);
    }
}

import com.xjw.callback.RabbitMq/confirm/iCallback;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class RabbitmqServiceImpl implements RabbitmqService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitMq/confirm/iCallback rabbitMq/confirm/iCallback;

    
    @Override
    public void sendMessage(String exchange, String routingKey, String messages, CorrelationData correlationData) {
        
        rabbitTemplate.set/confirm/iCallback(rabbitMq/confirm/iCallback);
        rabbitTemplate.convertAndSend(exchange, routingKey, messages, correlationData);
    }
}


  • 接口管理
import com.xjw.entity.pojo.Order;
import com.xjw.entity.vo.R;
import com.xjw.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;
import java.util.UUID;


@RestController
@RequestMapping("/order")
@Validated
public class OrderController {

    @Autowired
    public OrderService orderService;

    @PostMapping("/start")
    public R page(@RequestBody String productName) {
        Order order = new Order();
        order.setAmount(BigDecimal.valueOf(5000));
        order.setProductName(productName);
        order.setOrderId(UUID.randomUUID().toString().replace("-", "").toLowerCase());
        orderService.start(order);
        return R.success();
    }
}

  • mq/本地消息回调
import com.alibaba.fastjson.JSON;
import com.xjw.config.constant.RabbitmqConstant;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import com.xjw.service.RabbitmqService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class RabbitMq/confirm/iCallback implements RabbitTemplate./confirm/iCallback {

    @Autowired
    private MessageRecordService messageRecordService;

    @Autowired
    public RabbitmqService rabbitmqService;

    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        
        String messageId = correlationData.getId();
        // 未发送成功
        if (!ack) {
            MessageRecord messageRecord = messageRecordService.find(messageId);
            if (null != messageRecord) {
                // 重发
                rabbitmqService.sendMessage(RabbitmqConstant.ORDER_EXCHANGE, RabbitmqConstant.ORDER_ROUTING_KEY, JSON.toJSONString(messageRecord), new CorrelationData(messageRecord.getMessageId()));
            }
        } else {
            // 修改消息状态为成功
            messageRecordService.update(messageId);
        }
    }
}

@Component
public class OrderMessageRecord/confirm/i implements MessageRecordCallback {

    @Override
    public boolean /confirm/i(MessageRecord messageRecord) {
        String messageId = messageRecord.getMessageId();
        
        if ("1212321".equals(messageId)) {
            return true;
        }
        return false;
    }
}


  • 定时任务
import com.xjw.callback.MessageRecordCallback;
import com.xjw.entity.pojo.MessageRecord;
import com.xjw.service.MessageRecordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;


@Component
@EnableScheduling
public class MessageRecord/confirm/iTask {

    @Autowired
    public MessageRecordService messageRecordService;

    @Autowired
    public MessageRecordCallback messageRecordCallback;

    
    @Scheduled(cron = "0 0/5 * * * ?")
    public void /confirm/i() {
        // 查询所有状态等于0(未提交的状态)
        List all = messageRecordService.findAll(0);
        if (null != all && all.size() > 0) {
            all.forEach(messageRecord -> {
                boolean confirm = messageRecordCallback./confirm/i(messageRecord);
                // 根据回调结果执行提交或者回滚
                messageRecordService.commit(messageRecord.getMessageId(), /confirm/i);
            });
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/632819.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号