上一篇文章中阐述了用户下单的逻辑以及具体的技术点,本篇阐述另外一个重要的业务逻辑,即订单消息的异步通知机制
基于rocketmq实现订单消息的异步通知
订单创建完成之后,用户需要在指定的时间内支付该订单,支付成功之后,则需要安排订单的发货,这里需要调用第三方物流接口完成物流订单的创建,然而第三方物流接口是不稳定的,可能存在不可用的情况,因此如果直接在支付模块中调用第三方物流接口,如果遇到调用失败的情形,则整个订单被视为支付失败,这样对于用户的体验很不好。
因此这里需要进行业务的解耦,即将物流服务和支付服务解耦,支付完成之后发送一个支付成功的消息到MQ中,物流服务从MQ中消费消息,并根据消息中订单信息调用第三方物流接口创建物流订单,如果因为接口超时获取不可用情形导致创建物流订单失败,则物流服务模块可以进行重试或者由人工进行干预,这样订单的支付业务不会受到影响。
然而这里也需要考虑到分布式调用的情况,对于订单支付功能,主要包含2个业务逻辑:
- 校验用户余额,并且扣减用户余额
- 发送支付成功的消息到MQ中
这2个操作必须保证原子性,即如果用户余额不足,则不能扣减余额也不能发送支付成功消息;如果某个时刻MQ不可用,发送到MQ中的消息失败,则扣减的余额也需要回滚。
这里再采用上文提到的seata框架无法解决,因为发送到MQ中的消息无法进行回滚操作。在此选用rocketmq,因为rocketmq支持事务消息,该特性可以很好地解决本文前述需求。
rocketmq基于半消息(half message)实现分布式事物,half message是一种特殊的消息,发送到broker中的half message不能被消费,需要发送端主动确认该消息的状态,发送端可以在发送half message之后执行本地事务,若本地事务执行成功则发送commit消息,broker收到commit消息之后会将之前的half message投递到真实的topic中,则该消息可以被消费;若本地事务执行失败则发送rollback消息,broker会删除之前的half message
基于rocketmq事务消息实现订单支付的业务逻辑流程如下:
如上图所示,支付订单时,首先发送一个half message类型的订单支付消息到broker中,然后执行本地事务,即扣减用户余额,若余额扣减成功,则发送commit消息到broker,即本次支付完成;若余额扣减失败,则发送rollback消息到broker,即本次支付失败,且下游服务不会收到该请求。
这里还需考虑另外一个问题,即由于网络问题导致发送到broker中的commit或rollback消息失败,则broker一直无法收到该half message的确认信息;rocketmq提供了一种反查机制,即对于超过一定时间的half message,主动询问发送端该消息对应本地事务的状态,发送端需要作出回应commit或者rollback。
rocketmq事务消息需要实现TransactionListener接口
public class OrderTransactionListener implements TransactionListener {
private static final Logger logger = LoggerFactory.getLogger(OrderTransactionListener.class);
@Autowired
private UserService userService;
@Autowired
private MQTransactionMapper transactionMapper;
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
logger.info("开始执行本地事务: {}", message);
String body = new String(message.getBody());
OrderMessage orderMessage = OrderMessage.toOrderMessage(body);
if (null == orderMessage) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
Long userId = orderMessage.getUserId();
Long totalMoney = orderMessage.getTotalPrice();
String orderId = orderMessage.getOrderId();
String transactionId = message.getTransactionId();
try {
//执行本地事务,扣减余额并且记录事务表
if (userService.addOrDecreaseMoney(userId, totalMoney, transactionId, orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
} catch (Exception e) {
logger.error("执行本地事务失败:", e);
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//在本地事务表中查找对应的事物是否存在,若存在则表明扣款成功
String transactionId = messageExt.getTransactionId();
MQTransaction transaction = transactionMapper.getTransaction(transactionId);
if (null != transaction) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
这里需要一个本地事务表来实现反查的功能,即扣款成功了在本地事务表插入一条记录,反查时若本地事务表中能查询到该事务则表明扣款成功
@Transactional
public boolean addOrDecreaseMoney(Long userId, Long money, String transactionId, String orderId) {
User user = userMapper.getUserById(userId);
if (null != user) {
if (userMapper.addOrDecreaseMoney(userId, money, user.getVersion()) <= 0) {
return false;
}
//若扣减余额成功,则插入一条本地事务记录
MQTransaction transaction = new MQTransaction();
transaction.setId(transactionId);
transaction.setBusiness("order");
transaction.setOrderId(orderId);
transactionMapper.addTransaction(transaction);
return true;
}
return false;
}
下面演示一下效果:
创建订单之后,支付订单:
rocketmq中写入该消息:
下游服务消息该消息并写入到mysql中:
若用户余额不足导致支付失败
则rocketmq中不会写入该消息,mysql中也没有该记录:



