栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ——延迟队列

RabbitMQ——延迟队列

1. 延迟队列 1.1 概念

延迟队列内部是有序的,其中的元素是希望在指定时间到了之后取出或者处理,总之就是用来存放需要在指定时间被处理的元素的队列。

1.2 使用场景

用来在一定时间间隔内对相应的事件做出提醒或者操作。

1.3 整合SpringBoot

正常新建SpringBoot项目即可。SpringBoot版本为2.5.6,如果版本过高与Swagger2版本相差过大就会抛出异常。
添加依赖:


        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-amqp
        

        
            org.springframework.boot
            spring-boot-starter-web
            2.5.6
        

        
            com.alibaba
            fastjson
            1.2.78
        

        
            org.projectlombok
            lombok
            1.18.20
        

        
            io.springfox
            springfox-swagger2
            2.9.2
        

        
            io.springfox
            springfox-swagger-ui
            2.9.2
        

        
            org.springframework.amqp
            spring-rabbit-test
            test
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

新增Swagger配置类:

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket webApiConfig(){
        return new Docket(documentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
                .title("RabbitMQ接口文档")
                .description("RabbitMQ文档接口定义")
                .version("1.0")
                .contact(new Contact("RabbitMQ","https://www.baidu.com"
                        ,"baidu@qq.com"))
                .build();
    }
}

配置文件:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
1.4 延迟队列实战 1.4.1 图示

    P:生产者;X:普通交换机;Y:延迟交换机;QA:延迟10s普通队列;QB:延迟40s普通队列;QD:死信队列;C:消费者。
1.4.2 队列配置

主要是基于SpringBoot方式将队列交换机RotingKey等绑定起来。

@Configuration
public class TtlQueueConfig {

    // 普通交换机
    private static final String X_EXCHANGE = "X";

    // 死信交换机
    private static final String DEAD_LETTER_Y_EXCHANGE = "Y";

    // 普通队列
    private static final String QUEUE_A = "QA";
    private static final String QUEUE_B = "QB";

    // 死信队列
    private static final String DEAD_LETTER_QUEUE_D = "QD";

    // 设置普通交换机
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    // 设置死信交换机
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(DEAD_LETTER_Y_EXCHANGE);
    }

    // 设置普通队列
    @Bean("queueA")
    public Queue aQueue(){
        Map argument = new HashMap<>();
        // 设置死信交换机
        argument.put("x-dead-letter-exchange",DEAD_LETTER_Y_EXCHANGE);
        // 设置死信RoutingKey
        argument.put("x-dead-letter-routing-key","YD");
        // 设置过期时间
        argument.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(argument).build();
    }

    @Bean("queueB")
    public Queue bQueue(){
        Map argument = new HashMap<>();
        // 设置死信交换机
        argument.put("x-dead-letter-exchange",DEAD_LETTER_Y_EXCHANGE);
        // 设置死信RoutingKey
        argument.put("x-dead-letter-routing-key","YD");
        // 设置过期时间
        argument.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(argument).build();
    }


    // 设置死信队列
    @Bean("queueD")
    public Queue dQueue(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
    }

    
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueD).to(xExchange).with("YD");
    }

}
1.4.3 控制器模拟生产者发送消息
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给TTL队列:{}",new Date().toString(),message);
        // 执行发送消息到X交换机RoutingKey为XA,XB
        rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列" + message);
        rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列" + message);
    }

}

1.4.4 消费者
@Slf4j
@Component
public class DeadLetterConsumer {


    // 声明队列监听器,监听死信队列QD
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到来自死信队列消息:{}",new Date().toString(),msg);
    }

}

1.4.5 测试和结果

测试方式:访问http://localhost:8088/ttl/sendMsg/测试,访问了这个界面之后控制器会发送一条消息到正常的交换机中,等待10/40秒后会进入死信队列由消费者接收。

1.5 优化延迟队列

上方设计中,每新增一个时间需求就需要新增一个队列,这样会占有较多的资源,并且不便于管理。

1.5.1 图示


优化思路:在原先项目的基础上新增一个先不对时间进行限制队列QC,QC的时间限制在生产者发送消息的时候指定。

1.5.2 队列配置

在原先的配置基础上进行修改,新增队列QC,并且绑定它和交换机X,Y的关系。

声明队列:

private static final String QUEUE_C = "QC";

配置队列:

  public Queue cQueue(){
        Map argument = new HashMap<>();
        // 设置死信交换机
        argument.put("x-dead-letter-exchange",DEAD_LETTER_Y_EXCHANGE);
        // 设置死信RoutingKey
        argument.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(argument).build();
    }

绑定交换机:

    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
1.5.3 新增控制器方法
    @GetMapping("/sendTimeAndMsg/{time}/{message}")
    public void sendTimeAndMsg(@PathVariable String time,@PathVariable String message){
        log.info("当前时间:{},发送一条延迟{}毫秒的信息给TTL队列:{}",new Date().toString(),time,message);
        rabbitTemplate.convertAndSend("X",
                "XC",
                "来自交换机X延迟为" + time +"毫秒的消息:" + message,
                msg -> {
                    //  设置时间
                    msg.getMessageProperties().setExpiration(time);
                    return msg;
                }) ;

    }
1.5.4 测试

依次访问:
http://localhost:8088/ttl/sendTimeAndMsg/20000/20sMessage,
http://localhost:8088/ttl/sendTimeAndMsg/2000/2sMessage

按照上方的延迟时间,如果相隔的时间不超过20秒,应该是先收到延迟为2秒的消息。但是显然并不是按照两秒的时间来进行接收的。这就引出了一个新的问题:RabbitMQ只会检测第一个消息是否过期,只要第一条没有到期后面的也不会优先得到执行。

1.6 基于插件延迟队列 1.6.1 优势

在使用插件之前:
使用了插件之后:

也就是说消息停留在生产者将消息发送给交换机之后,等停留的时间过了之后,交换机才会将消息发给队列,并且只需要一个队列和一个交换机即可完成。

1.6.2 安装插件

下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载之后的ez文件复制到RabbitMQ安装目录下的plugins文件夹下,然后使用cmd进入到RabbitMQ安装目录下的sbin文件夹下运行:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如图:

然后进入到web端,随便新增一个交换机,可以看到下拉框中多了一个x-delayed-message选项:

这里就表示安装完成了。

1.6.3 实战 1.6.3.1 图示

直接将消息发送给交换机,交换机等待相应时间之后再发给队列。

1.6.3.2 配置类
@Configuration
public class DelayedQueueConf {

    // 交换机
    private static final String DELAYED_EXCHANGE = "delayed.exchange";

    // 队列
    private static final String DELAYED_QUEUE = "delayed.queue";

    // rotingKey
    private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    
    @Bean("delayQueue")
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE);
    }


    
    @Bean
    public CustomExchange delayedExchange(){
        Map args = new HashMap<>();
        // 延迟类型
        args.put("x-delayed-type","direct");
        
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",
                false,false,args);
    }

    
    @Bean
    public Binding delayQueueBindingDelayExchange(@Qualifier("delayQueue") Queue queue,
                                                  @Qualifier("delayedExchange") CustomExchange customExchange){
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}

1.6.3.4 生产者控制器
    
    @GetMapping("/sendTimeAndMsgByPlugins/{time}/{message}")
    public void sendTimeAndMsgByPlugins(@PathVariable Integer time,@PathVariable String message){
        log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}",new Date().toString(),time,message);
        rabbitTemplate.convertAndSend("delayed.exchange",
                "delayed.routingKey",message,
                msg -> {
                    msg.getMessageProperties().setDelay(time);
                    return msg;
        });
    }
1.6.3.5 消费者监听
@Slf4j
@Component
public class DelayQueueConsumer {


    // 声明队列监听器,监听死信队列QD
    @RabbitListener(queues = "delayed.queue")
    public void receiveD(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到来自死信队列消息:{}",new Date().toString(),msg);
    }

}
1.6.4 测试

依次访问:
http://localhost:8088/ttl/sendTimeAndMsgByPlugins/20000/20sMessage,
http://localhost:8088/ttl/sendTimeAndMsgByPlugins/2000/2sMessage
出现如下结果,之前出现死信的情况消失了。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/757999.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号