栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Day7 rabbitmq基础(二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Day7 rabbitmq基础(二)

整合springboot

引入依赖

        
            org.springframework.boot
            spring-boot-starter-amqp
            2.2.10.RELEASE
        

配置文件

spring:
  rabbitmq:
    host: 192.168.137.128
    port: 5672
    username: root
    password: root
死信队列

没有被及时消费的消息存放的队列,消息没有被消费主要有以下原因:

a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

b.TTL(time-to-live) 消息超时未消费

c.达到最大队列长度

消息变成死信后,会被重新投递(publish)到另一个交换机上(Exchange),这个交换机往往被称为DLX(dead-letter-exchange)“死信交换机”,然后交换机根据绑定规则转发到对应的队列上,监听该队列就可以被重新消费。

生产者–>发送消息–>交换机–>队列–>变成死信队列–>DLX交换机–>队列–>监听–>消费者

在声明普通queue时加入props

params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key", "222");

要收到死信交换机的消息,要以此时指定的"x-dead-letter-routing-key"去指定死信队列的路由键

TTL(time-to-live) 消息超时未消费

更改生产者参数

设置生产者发送消息的TTL

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) {
            channel.basicPublish(consumer01.NORMAL_EXCHANGE,"222",properties,("message"+i).getBytes(StandardCharsets.UTF_8));
        }

达到最大队列长度

params.put("x-max-length",3);               设置普通队列的长度,当队列中充满积压消息时,消息就会被转到死信队列

消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

手动应答消息,并在拒绝应答消息将重新放回queue设为false
 channel.basicReject(message.getEnvelope().getDeliveryTag(),false);

延时队列,用于存放需要在指定时间内被处理的元素

可以实现定时任务的功能

例如:

实现延时队列


创建如上图所示的队列和交换机


@Configuration
public class RabbitConfig {
    public static final String DIRECT_EXCHANGE = "X";
    public static final String QA_QUEUE = "QA";
    public static final String QB_QUEUE = "QB";
    public static final String DEAD_EXCHANGE = "Y";
    public static final String DEAD_QUEUE = "QD";

    @Bean
    public DirectExchange direct_exchange(){
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    public DirectExchange dead_exchange(){
        return new DirectExchange(DEAD_EXCHANGE);
    }
    //自定义过期时间的消息发送
    @Bean
    public Queue ttl_queue(){
        HashMap map = new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable("ttl_queue").withArguments(map).build();
    }
    @Bean
    public Queue dead_queue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    @Bean
    public Queue qa_queue(){
        return QueueBuilder.durable(QA_QUEUE)
                .ttl(10000)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey("YD")
                .build();
    }

    @Bean
    public Queue qb_queue(){
        return QueueBuilder.durable(QB_QUEUE)
                .ttl(40000)
                .deadLetterRoutingKey("YD")
                .deadLetterExchange(DEAD_EXCHANGE)
                .build();
    }

    @Bean
    public Binding queueBindingA(@Qualifier("direct_exchange")DirectExchange directExchange, @Qualifier("qa_queue")Queue a, @Qualifier("qb_queue")Queue b){
        return BindingBuilder.bind(a).to(directExchange).with("XA");
    }

    @Bean
    public Binding queueBindingB(@Qualifier("direct_exchange")DirectExchange directExchange, @Qualifier("qb_queue")Queue b){
        return BindingBuilder.bind(b).to(directExchange).with("XB");
    }

    @Bean
    public Binding queueBindingTTL(@Qualifier("direct_exchange")DirectExchange directExchange, @Qualifier("ttl_queue")Queue b){
        return BindingBuilder.bind(b).to(directExchange).with("XTTL");
    }


    @Bean
    public Binding queueBindingDead(@Qualifier("dead_exchange")DirectExchange directExchange,@Qualifier("dead_queue")Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("YD");
    }

}

用于消费延迟消息的死信队列

@Component
public class consumer {

    @RabbitListener(queues = "QD")
    public void consumeDead(Message message, Channel channel){;
        String msg = message.getBody().toString();
        System.out.println("消费来自死信队列的消息"+msg+"  "+new Date().toString());
    }
}

自定义时间的延迟队列

也可以采用QC,实现一个自定义延迟时长的延迟队列,但是会有一个问题
RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,

如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。这就导致第二个消息设置的过期时间没有意义了

安装延时队列插件来解决这个问题:

插件安装地址

将插件放到rabbitmq的plugins目录下,例如:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins

开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

插件实现:通过交换机延迟指定时间

实现代码:

 @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
    }
    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/749910.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号