栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

消息中间件之RabbitMQ(三):死信队列和延迟队列

消息中间件之RabbitMQ(三):死信队列和延迟队列

死信队列 概念

死信,就是由于某些原因无法被正常消费的消息死信队列,就是用来存储死信的队列。死信交换机,实际上就是一个普通的direct类型交换机应用场景:

因为没有被正常消费的消息被路由到了死信队列,这保证了消息数据不丢失,相当于在持久化、发布-确认的基础上又加了一层保护。订单创建后在指定时间未支付时自动失效(要结合延迟队列来实现)
死信的来源

消息 TTL 过期:TTL是Time To Live的缩写, 也就是生存时间队列达到最大长度:队列满了,无法再添加数据到 mq 中消息被拒绝:(basic.reject 或 basic.nack) 并且 requeue=false 生产者核心代码(原生代码)

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间 10s
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制
for (int i = 1; i < 11; i++) {
    String message = "info" + i;
    channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
    System.out.println("生产者发送消息:" + message);
}
正常消费者核心代码
//正常队列绑定死信队列信息
Map params = new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
//设置正常队列的长度限制,例如发10个,4个则为死信
params.put("x-max-length", 6);

//正常队列
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    if (message.equals("info5")) {
        System.out.println("Consumer 接收到消息" + message + "并拒绝签收该消息");
        //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
        channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    } else {
        System.out.println("Consumer 接收到消息" + message);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
};
//拒绝消息的前提是开启手动应答
channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});
死信消费者核心代码
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println("Dead Consumer 接收到消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});
延迟队列 概念

就是用来存放需要在指定时间被处理的元素的队列使用场景:

订单在十分钟之内未支付则自动取消新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天内没有登陆则进行短信提醒用户发起退款,如果三天内没有得到处理则通知相关运营人员预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 基于死信的延迟队列

该方式是通过消息在延迟队列中TTL到期后,路由到死信队列来实现的消息延迟处理

该方式需要为不同的TTL时间的消息创建不同的延迟队列,比较麻烦

架构图(创建两个TTL时间不同的延迟队列QA、QB,消息到期后路由到同一个死信队列QD):

配置类核心代码(SpringBoot方式)

//声明队列A  并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() {
    return QueueBuilder.durable(QUEUE_A).deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
        .deadLetterRoutingKey("YD")
        .ttl(10000).build();//设置队列中消息的ttl
}

// 队列A 绑定X交换机
@Bean
public Binding queueaBindingX(Queue queueA, DirectExchange xExchange) {
    return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}

生产者核心代码:

@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
    rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
}

消费者核心代码:

@RabbitListener(queues = "QD")
public void receiveD(Message message) {
    String msg = new String(message.getBody());
    log.info("当前时间:{},收到死信队列信息{}", new Date(), msg);
}
基于插件的延迟队列(推荐)

MQ默认无法实现同一个队列延迟不同TTL的消息

该方式通过自定义交换机,实现了一个通用的延迟队列

下载安装步骤:

rabbitmq_delayed_message_exchange 插件下载地址(注意版本兼容性)放置到 RabbitMQ 的插件目录sbin目录下执行命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange重启MQ

架构图(只需要一个普通队列,关键在于这个交换机):

配置类核心代码:

@Bean
public Queue delayedQueue() {
    return new Queue(DELAYED_QUEUE_NAME);
}

//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
    Map args = new HashMap<>();
    args.put("x-delayed-type", "direct");//自定义交换机的类型
    return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}

@Bean
public Binding bindingDelayedQueue(Queue queue, CustomExchange delayedExchange) {
    return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}

生产者核心代码:

rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY,
                message,
                correlationData -> {
                    correlationData.getMessageProperties().setDelay(delayTime);
                    return correlationData;
                });

消费者核心代码:

@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
    String msg = new String(message.getBody());
    log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);
}
完整代码

完整代码:GitHub

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774836.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号