栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 死信交换机&延迟队列

RabbitMQ 死信交换机&延迟队列

一,死信队列(延迟队列)

死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。

一般来说,生产者将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待 条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。

死信消息来源:

消息 TTL 过期

队列满了,无法再次添加数据

消息被拒绝(reject 或 nack),并且 requeue =false

代码编写:

编写可根据上篇博客来

交换机的使用

1,生产者创建队列和交换机

package com.lgs.scz.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@SuppressWarnings("all")
public class DeadConfig {

    
    @Bean
    public Queue normalQueue(){
        Map config=new HashMap<>();
        //过期时间
        config.put("x-message-ttl", 10000);
        //死信交换机
        config.put("x-dead-letter-exchange", "deadExchange");
        //死信routing key
        config.put("x-dead-letter-routing-key", "DD");
        return new Queue("normalQueue",true,false,false,config);
    }

    @Bean
    public Queue deadQueue(){
        return new Queue("deadQueue",true);
    }

    
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normalExchange");
    }

    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange("deadExchange");
    }

    
    @Bean
    public Binding normalBinding() {
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("CC");
    }

    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("DD");
    }


}

2,生产者发送信息

package com.lgs.scz.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SuppressWarnings("all")
@Slf4j
public class ProviderController {

    @Autowired
    private RabbitTemplate template;

    @RequestMapping("/deadSend")
    public String deadSend(){
        log.warn("订单已经保存");
        template.convertAndSend("normalExchange","CC","order-1902");
        return "yes";
    }

}

3,消费者接收信息

package com.lgs.xfz.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@RabbitListener(queues = "deadQueue")
@Slf4j
public class DeadReceiver {

    @RabbitHandler
    public void process(String message){
        log.warn(message+":该订单已经过期");
    }

}

OK!到这就结束了,希望能帮到你!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745013.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号