背景做过电商系统的人都会遇到一个场景,就是下了订单之后,订单支付会有一个有效期,超时订单自动关闭。实现的技术有很多,再次讨论基于RabbitMQ进行实现
思路这个是基于RabbitMQ的延迟队列实现的,那需要讨论下什么是延迟队列
延迟队列延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不
想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费 。PS: 在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 DLX(死信队列) 和 TTL 模拟出延迟队列的功能,所以需要讨论下什么是死信队列
死信队列DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当
消息在一个队列中变成死信( dead message )之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
消息一般在一下情况下会转换成死信队列:
- 消息被拒绝( Basic.Reject/Basic.Nack ),井且设置 requeue 参数为 false;
- 消息过期(TTL);
- 队列达到最大长度。
我们这里采用消息过期的方式进行分析实现
PS: 建立两个队列(可以是不同交换器),将队列A中信息设置成一个过期时间,当消息过期之后,会自动投递到队列B,那么监听队列B的消费根据再进行接下来的业务处理
实现
maven配置
org.springframework.boot spring-boot-starter-actuatororg.springframework.boot spring-boot-starter-webcom.baomidou mybatis-plus-boot-starter3.3.1.tmp com.github.xiaoymin knife4j-spring-boot-starter2.0.1 guava com.google.guava com.google.guava guava20.0 org.springframework.boot spring-boot-starter-amqporg.springframework.amqp spring-rabbit-testtest mysql mysql-connector-java5.1.29 org.projectlombok lomboktrue
基于Springboot的RabbitMQ配置
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.116.128:3306/store?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
username: root
password: root
hikari:
pool-name: coreHikariPool
maximum-pool-size: 12
connection-timeout: 30000
minimum-idle: 10
idle-timeout: 500000
max-lifetime: 540000
connection-test-query: SELECT 1
auto-commit: true
rabbitmq:
addresses: 192.168.116.143:5672,192.168.116.144:5672,192.168.116.145:5672
username: admin
password: admin$
配置启动时候创建的队列
package com.example.order.core.config;
public enum QueueEnum {
QUEUE_AUTO_CANCEL("vv_x", "zz_test", "key"),
QUEUE_DELAY_ORDER("vv_x_ttl", "zz_test_ttl", "key_ttl");
QueueEnum(String exchange, String queue, String routeKey) {
this.exchange = exchange;
this.queue = queue;
this.routeKey = routeKey;
}
public String getExchange() {
return exchange;
}
public String getQueue() {
return queue;
}
public String getRouteKey() {
return routeKey;
}
private String exchange;
public String queue;
private String routeKey;
}
package com.example.order.core.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_AUTO_CANCEL.getExchange())
.durable(true)
.build();
}
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_DELAY_ORDER.getExchange())
.durable(true)
.build();
}
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_AUTO_CANCEL.getQueue());
}
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_DELAY_ORDER.getQueue())
//到期后转发的交换机
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_AUTO_CANCEL.getExchange())
//到期后转发的路由键
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_AUTO_CANCEL.getRouteKey())
.build();
}
@Bean
Binding orderBinding(DirectExchange orderDirect, Queue orderQueue) {
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_AUTO_CANCEL.getRouteKey());
}
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) {
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_DELAY_ORDER.getRouteKey());
}
}
消息生产者
package com.example.order.core.listener;
import com.example.order.core.config.QueueEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(Long orderId, final long delayTimes) {
//给延迟队列发送消息
rabbitTemplate.convertAndSend(QueueEnum.QUEUE_DELAY_ORDER.getExchange(), QueueEnum.QUEUE_DELAY_ORDER.getRouteKey()
, orderId, message -> {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
log.info("send delay message orderId:{}", orderId);
}
}
消息消费者
package com.example.order.core.listener;
import com.example.order.core.service.ShopOrderOperateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AutoCancelListener {
@Autowired
private ShopOrderOperateService operateService;
@RabbitListener(queues = "zz_test")
@RabbitHandler
public void handle(Long orderId) {
boolean b = operateService.cancelOrder(orderId);
log.info("receive delay message orderId:{},and auto cancel flag:{}", orderId, b);
}
}
真正实现自动取消订单的实现类及所有依赖
package com.example.order.core.service;
import com.example.order.core.entity.ShopOrderDO;
public interface ShopOrderOperateService {
boolean save(ShopOrderDO dto);
boolean cancelOrder(Long orderId);
}
package com.example.order.core.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
@TableName("shop_order")
@ApiModel(value = "实体")
public class ShopOrderDO extends baseDO {
private static final long serialVersionUID = -8985653170140721455L;
@ApiModelProperty(value = "订单状态", notes = "1:待支付,2:支付成功,3:订单超时 4:订单支付失败")
private Integer status;
@ApiModelProperty(value = "重要成都", notes = "")
private Integer importance;
@ApiModelProperty(value = "备注", notes = "")
private String remark;
@Override
public String toString() {
return "ShopOrderDO{" +
" id=" + id +
", status=" + status +
'}' + "过了:" + (System.currentTimeMillis() - this.getGmtCreate().getTime()) / 1000 + "s";
}
}
//ShopOrderMapper.xml
id, gmt_create, creator, gmt_modified, modifier, status, is_deleted, importance,remark
package com.example.order.core.mapper; import com.baomidou.mybatisplus.core.mapper.baseMapper; import com.example.order.core.entity.ShopOrderDO; public interface ShopOrderMapper extends baseMapper{ }
package com.example.order.core.service; import com.baomidou.mybatisplus.extension.service.IService; import com.example.order.core.entity.ShopOrderDO; public interface ShopOrderService extends IService{ }
package com.example.order.core.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.example.order.core.entity.ShopOrderDO; import com.example.order.core.mapper.ShopOrderMapper; import com.example.order.core.service.ShopOrderService; import org.springframework.stereotype.Service; @Service public class ShopOrderServiceImpl extends ServiceImplimplements ShopOrderService { }
package com.example.order.core.service;
import com.example.order.core.entity.ShopOrderDO;
public interface ShopOrderOperateService {
boolean save(ShopOrderDO dto);
boolean cancelOrder(Long orderId);
}
package com.example.order.core.service.impl;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.core.entity.ShopOrderDO;
import com.example.order.core.listener.OrderMessageSender;
import com.example.order.core.service.ShopOrderOperateService;
import com.example.order.core.service.ShopOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Slf4j
@Service
public class ShopOrderOperateServiceImpl implements ShopOrderOperateService {
@Resource
OrderMessageSender sender;
@Autowired
private ShopOrderService ext;
@Override
public boolean save(ShopOrderDO dto) {
try {
dto.setStatus(1);
boolean save = ext.save(dto);
if (save) {
sender.sendMessage(dto.getId(), 1 * 60 * 1000);
}
return true;
} catch (Exception e) {
log.error(".....{}", e);
}
return false;
}
@Override
public boolean cancelOrder(Long orderId) {
//把待支付中的订单设置成订单失效
LambdaUpdateWrapper update = Wrappers.lambdaUpdate();
update.eq(ShopOrderDO::getId, orderId);
update.eq(ShopOrderDO::getStatus, 1);
update.set(ShopOrderDO::getStatus, 3);
update.set(ShopOrderDO::getGmtModified, new Date());
update.set(ShopOrderDO::getModifier, "auto");
update.set(ShopOrderDO::getRemark, "过期了");
return ext.update(update);
}
}
模拟新增订单的接口
package com.example.order.core.controller;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.core.entity.ShopOrderDO;
import com.example.order.core.service.ShopOrderOperateService;
import com.example.order.core.service.ShopOrderService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
import java.util.List;
@RestController
@Api(tags = "-相关接口")
@RequestMapping(value = "/shop/order", produces = MediaType.APPLICATION_JSON_VALUE)
public class ShopOrderController {
@Autowired
private ShopOrderOperateService operateService;
@Autowired
private ShopOrderService ext;
@GetMapping("list")
@ApiOperation(value = "查询集合")
public List listShopOrderServiceByPage(ShopOrderDO query) {
LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(query);
return ext.list(wrapper);
}
@GetMapping("{id}")
@ApiOperation(value = "获取某一实体")
public ShopOrderDO getShopOrderServiceDetails(@PathVariable Long id) {
return ext.getById(id);
}
@PostMapping
@ApiOperation(value = "新增数据")
public boolean saveShopOrderService(@RequestBody ShopOrderDO dto) {
dto.setGmtCreate(new Date());
dto.setGmtModified(new Date());
dto.setCreator("初始化");
dto.setRemark("新建");
return operateService.save(dto);
}
@PutMapping("{id}")
@ApiOperation(value = "修改数据")
public boolean modifyShopOrderService(@RequestBody ShopOrderDO dto, @PathVariable Long id) {
dto.setId(id);
dto.setGmtModified(new Date());
dto.setModifier("更新");
dto.setRemark("手动更新");
return ext.updateById(dto);
}
}
PS: 消费者拿到消息之后,可以根据消息获取到订单信息,根据订单信息进行操作是过期还是其他状态
当然,如果要确保订单消息一定不会丢失,还可以使用RabbitMQ的发送确认功能,这里略过不提
记录下以备后用



