栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ——死信队列

RabbitMQ——死信队列

1. 死信队列 1.1 概念

死信就是无法被消费的消息,消费者从队列中取消息时,由于某些特定原因导致消息无法被消费,即没有了后续的处理,就变成了死信继而有了死信队列。

1.2 应用场景

可以保证消息不会消失,如果消费者在进行消费时发送异常,可以先放到死信队列中,等后面运行环境好了之后再进行消费。

1.3 死信来源
    消息TTL过期;队列达到最大长度;消息被拒绝。
1.4 死信实战

架构图:

    正常情况下使用normal交换机绑定给normal队列C1进行消费;出现异常情况就转发给 dead交换机绑定给dead队列让C2进行消费。
1.4.1 消息TTL过期

测试过程:

    开启消费者1构建交换机队列RoutingKey之间的关系;然后关闭消费者1;开启生产者发送消息,等待十秒之后消息过期就会到达死信队列;然后启动消费者2接收死信队列里面的消息。

消费者1:

public class Consumer1 {


    // 队列名称
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String DEAD_QUEUE = "dead_queue";

    // 交换机名称
    private static final String NORMAL_EXCHANGE = "e_ch";
    private static final String DEAD_EXCHANGE = "d_ch";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明队列和交换机类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        Map arguments = new HashMap<>();
        // 设置过期时间 10秒 ; 换做在发送方指定。
//        arguments.put("x-message-ttl",10000);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        // 设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");

        // 创建队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false
                ,arguments);
        // 绑定RoutingKey
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

        // 创建死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("C1等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("C1接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };

        channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, consumerTag ->{});
    }

}

消费者2:

public class Consumer2 {

    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C2等待接收消息...");
        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("C2接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        channel.basicConsume(DEAD_QUEUE,true, deliverCallback, consumerTag ->{});
    }

}

生产者:

public class Producer {

    private static final String NORMAL_EXCHANGE = "e_ch";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 声明交换机。
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 设置TTL时间存活时间 10 秒
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 0; i < 11; i++) {
            String msg = "message" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送成功:" + msg);
        }
    }
}
1.4.2 队列达到最大长度

修改消费者1,指定最多长度为6条消息,超过这6条消息之后的消息会进入到死信中。
消费者1给村参数的map新增一条:

arguments.put("x-max-length",6);

删除生产者中对队列消息事件的限制。
如果出现406错误就是已经存在队列,可以删除原有队列或者修改一个新名字。

1.4.3 消息被拒

修改消费者1,拒绝最后一位为双数的消息。

消费者1:

public class Consumer1 {


    // 队列名称
    private static final String NORMAL_QUEUE = "normal_queue1";
    private static final String DEAD_QUEUE = "dead_queue1";

    // 交换机名称
    private static final String NORMAL_EXCHANGE = "e_ch1";
    private static final String DEAD_EXCHANGE = "d_ch1";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明队列和交换机类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        Map arguments = new HashMap<>();
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        // 设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");

        // 创建队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false
                ,arguments);
        // 绑定RoutingKey
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

        // 创建死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("C1等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            // 拒绝最后为单数的消息
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            if (Integer.parseInt(msg.substring(msg.length()-1)) % 2 == 0){
                System.out.println("C1拒绝的消息:" + msg +"----------===========-----------");
                // 拒绝消息
                
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else{
                System.out.println("C1接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        };
        // 开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag ->{});
    }

}

先启动消费者1创建对应关系,随之启动消费者2,再启动生产者。

生产者发送:

消费者1:

消费者2:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761585.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号