栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

基于spring cloud实现订单服务框架demo(三)

基于spring cloud实现订单服务框架demo(三)

上一篇文章中阐述了用户下单的逻辑以及具体的技术点,本篇阐述另外一个重要的业务逻辑,即订单消息的异步通知机制

基于rocketmq实现订单消息的异步通知
订单创建完成之后,用户需要在指定的时间内支付该订单,支付成功之后,则需要安排订单的发货,这里需要调用第三方物流接口完成物流订单的创建,然而第三方物流接口是不稳定的,可能存在不可用的情况,因此如果直接在支付模块中调用第三方物流接口,如果遇到调用失败的情形,则整个订单被视为支付失败,这样对于用户的体验很不好。

因此这里需要进行业务的解耦,即将物流服务和支付服务解耦,支付完成之后发送一个支付成功的消息到MQ中,物流服务从MQ中消费消息,并根据消息中订单信息调用第三方物流接口创建物流订单,如果因为接口超时获取不可用情形导致创建物流订单失败,则物流服务模块可以进行重试或者由人工进行干预,这样订单的支付业务不会受到影响。

然而这里也需要考虑到分布式调用的情况,对于订单支付功能,主要包含2个业务逻辑:

  1. 校验用户余额,并且扣减用户余额
  2. 发送支付成功的消息到MQ中

这2个操作必须保证原子性,即如果用户余额不足,则不能扣减余额也不能发送支付成功消息;如果某个时刻MQ不可用,发送到MQ中的消息失败,则扣减的余额也需要回滚。

这里再采用上文提到的seata框架无法解决,因为发送到MQ中的消息无法进行回滚操作。在此选用rocketmq,因为rocketmq支持事务消息,该特性可以很好地解决本文前述需求。

rocketmq基于半消息(half message)实现分布式事物,half message是一种特殊的消息,发送到broker中的half message不能被消费,需要发送端主动确认该消息的状态,发送端可以在发送half message之后执行本地事务,若本地事务执行成功则发送commit消息,broker收到commit消息之后会将之前的half message投递到真实的topic中,则该消息可以被消费;若本地事务执行失败则发送rollback消息,broker会删除之前的half message

基于rocketmq事务消息实现订单支付的业务逻辑流程如下:

如上图所示,支付订单时,首先发送一个half message类型的订单支付消息到broker中,然后执行本地事务,即扣减用户余额,若余额扣减成功,则发送commit消息到broker,即本次支付完成;若余额扣减失败,则发送rollback消息到broker,即本次支付失败,且下游服务不会收到该请求。
这里还需考虑另外一个问题,即由于网络问题导致发送到broker中的commit或rollback消息失败,则broker一直无法收到该half message的确认信息;rocketmq提供了一种反查机制,即对于超过一定时间的half message,主动询问发送端该消息对应本地事务的状态,发送端需要作出回应commit或者rollback。

rocketmq事务消息需要实现TransactionListener接口

public class OrderTransactionListener implements TransactionListener {

    private static final Logger logger = LoggerFactory.getLogger(OrderTransactionListener.class);

    @Autowired
    private UserService userService;

    @Autowired
    private MQTransactionMapper transactionMapper;

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {

        logger.info("开始执行本地事务: {}", message);
        String body = new String(message.getBody());
        OrderMessage orderMessage = OrderMessage.toOrderMessage(body);
        if (null == orderMessage) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        Long userId = orderMessage.getUserId();
        Long totalMoney = orderMessage.getTotalPrice();
        String orderId = orderMessage.getOrderId();
        String transactionId = message.getTransactionId();

        try {
            //执行本地事务,扣减余额并且记录事务表
            if (userService.addOrDecreaseMoney(userId, totalMoney, transactionId, orderId)) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception e) {
            logger.error("执行本地事务失败:", e);
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

        //在本地事务表中查找对应的事物是否存在,若存在则表明扣款成功
        String transactionId = messageExt.getTransactionId();
        MQTransaction transaction = transactionMapper.getTransaction(transactionId);
        if (null != transaction) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        return LocalTransactionState.UNKNOW;
    }
}

这里需要一个本地事务表来实现反查的功能,即扣款成功了在本地事务表插入一条记录,反查时若本地事务表中能查询到该事务则表明扣款成功

@Transactional
public boolean addOrDecreaseMoney(Long userId, Long money, String transactionId, String orderId) {
        User user = userMapper.getUserById(userId);
        if (null != user) {
            if (userMapper.addOrDecreaseMoney(userId, money, user.getVersion()) <= 0) {
                return false;
            }
            //若扣减余额成功,则插入一条本地事务记录
            MQTransaction transaction = new MQTransaction();
            transaction.setId(transactionId);
            transaction.setBusiness("order");
            transaction.setOrderId(orderId);
            transactionMapper.addTransaction(transaction);
            return true;
        }
        return false;
    }

下面演示一下效果:
创建订单之后,支付订单:

rocketmq中写入该消息:

下游服务消息该消息并写入到mysql中:

若用户余额不足导致支付失败

则rocketmq中不会写入该消息,mysql中也没有该记录:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/311264.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号