准备篇 RabbitMQ安装文档
第一章 RabbitMQ快速入门篇
第二章 RabbitMQ的Web管理界面详解
第三章 RabbitMQ进阶篇之死信队列
第四章 RabbitMQ进阶篇之通过插件实现延迟队列
文章目录
系列文章目录前言一、什么是死信队列二、什么样的消息会变成死信三、什么是死信交换机四、死信处理过程五、如何使用死信交换机六、实例
6.1 数据库表设计6.2 配置文件6.3 页面6.4 队列绑定6.5 创建生产者6.6 创建消费者
前言
恭喜所有看到本篇文章的小伙伴,成功解锁了RabbitMQ系列之高级特性 死信队列的内容通过本文,你将清楚的了解到:什么是死信?什么是死信队列?死信队列如何使用?等本文最后,小名将通过一个实例,来帮助大家记忆死信队列
一、什么是死信队列
由于某些原因消息无法被正确的投递或是无法被正常消费的消息,为了确保此类消息不会被无故的丢弃 ,一般将会将他们存在一个特殊角色的队列中,这个队列一般称之为 死信队列 。
如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。所以死信队列是RabbitMQ中为异常处理的消息提供的一种保障机制。
二、什么样的消息会变成死信- 消息被拒绝,使用channel.basicNack或channel.basicReject,并且此时requeue属性被设置为false。消息在队列的存活时间超过设置的TTL时间消息队列的消息数量已经超过最大队列长度
Dead-Letter-Exchange(DLX),称之为死信交换机。当消息在一个队列中变成死信(dead messag)之后,它被重新发送到另一个交换器中,这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。
四、死信处理过程DLX 也是一个正常的交换器,和一般的交换器没有区别,实际上就是设置某个队列的属性,它能在任何的队列上被指定。当这个队列中存在死信时 RabbitMQ 就会自动将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个死信队列上。
配置死信队列分为以下步骤:
- 配置业务队列,绑定到业务交换机上为业务队列配置死信交换机和路由key为死信交换机配置死信队列
配置代码:
//订单最多存在10s
args.put("x-message-ttl", 10 * 1000);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "ex.go.dlx");
//声明当前队列绑定的死信路由key
args.put("x-dead-letter-routing-key", "go.dlx");
web界面:
Time To Live(TTL)
RabbitMQ可以针对队列设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时,则消息变为dead letter(死信)
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange: 出现dead letter之后将dead letter重新发送到指定exchange。
x-dead-letter-routing-key: 出现dead letter之后将dead letter重新按照指定的routing-key发送。
演示业务:
模拟用户商城购买商品时的两种情况:1. 成功下单,2. 超时提醒
由于视频是小名分屏录制的,画面有些长,看不清内容的小伙伴劳烦 点击移步 全屏观看
业务场景:
- 用户下单用户下单后展示等待付款页面在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面如果超时,则给用户发送消息通知,询问用户尚未付款,是否还需要?
CREATE TABLE `practice_dlx_order` ( `id` bigint(20) NOT NULL, `name` varchar(255) DEFAULT NULL COMMENT '订单名称', `price` decimal(10,2) DEFAULT NULL COMMENT '金额', `timeout` tinyint(1) DEFAULT '0' COMMENT '是否已超时:1-已超时,0-未超时', `pay` tinyint(1) DEFAULT NULL COMMENT '是否已付款:1-已付款,0-未付款', `order_time` datetime DEFAULT NULL COMMENT '下单时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;6.2 配置文件
spring:
#thymeleaf配置
thymeleaf:
#模板的模式,支持如:HTML、XML、TEXT、JAVAscript等
mode: HTML5
#编码,可不用配置
encoding: utf-8
servlet:
#内容类别,可不用配置
content-type: text/html
#开发配置为false,避免修改模板还要重启服务器
cache: false
#配置模板路径,默认就是templates,可不用配置
prefix: classpath:/templates/
rabbitmq:
host: 部署rabbitmq的服务器外网ip
port: 5672
username: 用户名
password: 密码
publisher-/confirm/is: true
publisher-returns: true
6.3 页面
- 下单页面
下单页面
- 结账页面
结账页面
- 支付成功页面
支付成功!等待发货~
6.4 队列绑定
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("ex.go", true, false);
}
@Bean
public Queue queue() {
Map args = new HashMap<>(3);
//订单最多存在10s
args.put("x-message-ttl", 10 * 1000);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "ex.go.dlx");
//声明当前队列绑定的死信路由key
args.put("x-dead-letter-routing-key", "go.dlx");
return new Queue("q.go", true, false, false, args);
}
@Bean
public DirectExchange directExchangeDlx() {
return new DirectExchange("ex.go.dlx", true, false);
}
@Bean
public Queue queueDlx() {
return new Queue("q.go.dlx", true, false, false);
}
@Bean
public DirectExchange delayExchange() {
Map pros = new HashMap<>();
//设置交换机支持延迟消息推送
pros.put("x-delayed-message", "direct");
DirectExchange directExchange = new DirectExchange("ex.delay", true, false, pros);
directExchange.setDelayed(true);
return directExchange;
}
@Bean
public Queue delayQueue() {
Map args = new HashMap<>(3);
return new Queue("q.delay");
}
@Bean
public Binding bindingDelay(@Qualifier("delayQueue") Queue delayQueue,
@Qualifier("delayExchange") DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("q.delay");
}
@Bean
@Resource
public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("go").noargs();
}
@Bean
@Resource
public Binding bindingDlx(@Qualifier("queueDlx") Queue queueDlx, @Qualifier("directExchangeDlx") Exchange exchangeDlx) {
return BindingBuilder.bind(queueDlx).to(exchangeDlx).with("go.dlx").noargs();
}
}
6.5 创建生产者
@Controller
@RequestMapping("//practice-dlx-order")
public class PracticeDlxOrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private IPracticeDlxOrderService iPracticeDlxOrderService;
@RequestMapping("/orderPage")
public String orderPage() {
return "/dlx/orderPage";
}
@RequestMapping("/payPage")
public String payPage(HttpServletRequest httpServletRequest, ModelMap map) {
String form1name = httpServletRequest.getParameter("form1name");
String form1price = httpServletRequest.getParameter("form1price");
PracticeDlxOrder practiceDlxOrder = new PracticeDlxOrder();
practiceDlxOrder.setPay(false);//是否已付款:1-已付款,0-未付款
practiceDlxOrder.setPrice(new BigDecimal(form1price));
practiceDlxOrder.setName(form1name);
practiceDlxOrder.setOrderTime(new Date());
iPracticeDlxOrderService.getbaseMapper().insert(practiceDlxOrder);
//获取id
Long id = practiceDlxOrder.getId();
map.addAttribute("id", id);
rabbitTemplate.convertAndSend("ex.go", "go", String.valueOf(id));
return "dlx/payPage";
}
//支付成功
@RequestMapping("/paySuccessful")
public String paySuccessful(HttpServletRequest httpServletRequest) {
String id = httpServletRequest.getParameter("form1id");
UpdateWrapper dlxOrder = new UpdateWrapper<>();
dlxOrder.eq("id", Long.valueOf(id));
dlxOrder.set("pay",true);
iPracticeDlxOrderService.update(dlxOrder);
return "/dlx/paySuccessful";
}
}
6.6 创建消费者
@Component
@Slf4j
public class MqListener {
@Autowired
IPracticeDlxOrderService iPracticeDlxOrderService;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "q.go.dlx")
public void dlxListener(Message message, Channel channel) throws IOException {
System.out.println("支付超时");
Long id = Long.valueOf(new String(message.getBody(), "utf-8"));
PracticeDlxOrder order = iPracticeDlxOrderService.lambdaQuery().eq(PracticeDlxOrder::getId, id).one();
Boolean payStatue = order.getPay();
//判断是否支付
if (!payStatue) {//未支付,修改未超时
UpdateWrapper dlxOrder = new UpdateWrapper<>();
dlxOrder.eq("id", id);
dlxOrder.set("timeout", 1);
iPracticeDlxOrderService.update(dlxOrder);
log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), message, new Date().toString());
//未支付,10s后给用户发app信息
sendDelayMsg(id);
}
}
public Date dateRoll(Date date, int i, int d) {
// 获取Calendar对象并以传进来的时间为准
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
// 将现在的时间滚动固定时长,转换为Date类型赋值
calendar.add(i, d);
// 转换为Date类型再赋值
date = calendar.getTime();
return date;
}
//死信队列监听
@RabbitListener(queues = "q.delay")
public void delayListener(Message message, Channel channel) throws IOException {
System.out.println(new String(message.getBody()));
}
public void sendDelayMsg(Long id){
rabbitTemplate.setMandatory(true);
//id + 时间戳 全局唯一
Date date = DateUtil.getDate(new Date(),1,10);
CorrelationData correlationData = new CorrelationData(date.toString());
//发送消息时指定 header 延迟时间
rabbitTemplate.convertAndSend("ex.delay", "q.delay", "您的订单号:" + id + "尚未付款,是否还需要?",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(10*1000);
return message;
}
}, correlationData);
}
}
大功告成,大家可以 点我 查看成果
如若您在文章中发现任何错误的地方,希望您可以在评论区给予小名批评指正欄 如果觉得小名的文章帮助到了您,请关注小名的专栏【RabbitMQ】,支持一下小名,给小名的文章点赞、评论✍、收藏爛谢谢大家啦~♥♥♥



