栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot 使用rabbitMQ

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot 使用rabbitMQ

SpringBoot 整合 rabbitMQ
application.yml设置
spring:
 rabbitmq:
   host: localhost
   username: admin
   password: admin
创建交换机,消息队列,并进行绑定
@Configuration
public class RabbitMQConfig {
    private static final String TOPIC_NAME = "top_name";
    private static final String QUEUE_NAME = "queue_name";
    //交换机
    @Bean("queueEx")
    public Exchange queueEx(){
        return ExchangeBuilder.topicExchange(TOPIC_NAME).durable(true).build();
    }
    //消息队列
    @Bean("queue")
    public Queue queue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    //绑定
    @Bean
    public Binding queueExchange(@Qualifier("queueEx") Exchange exchange,@Qualifier("queue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("a.#").noargs();
    }
}
@Autowired
private RabbitTemplate rabbitTemplate;
生产者
public void q(){
 rabbitTemplate.convertAndSend("top_name","a.qwe","rabbitmqboot");
}
消费者
@RabbitListener(queues = "queue_name") //设置要监听的消息队列
public void listenerq(Message message,Channel channel){
 System.out.println(new String(message.getBody()));
}
消息可靠投递 Application.yml配置
rabbitmq:
   publisher-confirm-type: correlated
Java代码配置
package com.xiaoyu.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class PubConfirmAndReturn implements RabbitTemplate.ReturnsCallback,RabbitTemplate.ConfirmCallback{

   @Autowired
   RabbitTemplate rabbitTemplate;

   @PostConstruct
   public void init(){
       rabbitTemplate.setReturnsCallback(this);
       rabbitTemplate.setConfirmCallback(this);
   }
 
   //confirm确定模式
   @Override
   public void confirm(CorrelationData correlationData, boolean b, String s) {
       if (b) {
           System.out.println("消息发送到exchange成功");
       } else {
           System.out.println("失败原因:"+ s);
       }
   }
 
 //return退回模式
   @Override
   public void returnedMessage(ReturnedMessage returnedMessage) {
       System.out.println("消息送exchange到queue失败,详情为:");
       System.out.println("消息主体 message : "+new String(returnedMessage.getMessage().getBody()));
       System.out.println("描述:"+ returnedMessage.toString());
       System.out.println("消息使用的交换器 exchange : "+ returnedMessage.getExchange());
       System.out.println("消息使用的路由键 routing : "+ returnedMessage.getRoutingKey());
   }
} 
rabbitmq 整个消息投递的路径为:

producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 确认模式。
消息从 exchange–>queue 投递失败则会返回一个 returnCallback退回模式 。

ack
@RabbitListener(queues = "queue_name")
public void listener(Message message, Channel channel){
 long deliveryTag = message.getMessageProperties().getDeliveryTag();
 try {
   System.out.println(new String(message.getBody()));
   System.out.println("处理业务逻辑");
   Thread.sleep(2000);
   int a = 1/0;
   //        手动签收
   channel.basicAck(deliveryTag,true);
 } catch (Exception e) {
   //  第三个参数:requeue:重回队列,如果设置为true,则消息重新回到queue,broker会重新发送消息给消费端
   try {
     channel.basicNack(deliveryTag,true,true);
   } catch (Exception ioException) {
     ioException.printStackTrace();
   }
 }
}
当消息在被消费时,如果在不手动签收则会停滞,如果在处理业务逻辑时出错,则可以设置是否重回队列
ttl过期 队列过期
@Bean("queuettl")
public Queue queueTTL(){
 //        ttl:队列的过期时间
 return QueueBuilder.durable("queue_ttl").ttl(1000000).build();
}
@Bean("exchangettl")
public Exchange exchangeTTL(){
 return ExchangeBuilder.topicExchange("exchange_ttl").durable(true).build();
}
@Bean
public Binding ttl(@Qualifier("queuettl") Queue queue,@Qualifier("exchangettl") Exchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("q.#").noargs();
}
队列过期是指:所有发送到这个队列的消息如果没有在规定的时间消费就会被处理掉(删除改消息) 消息过期
public void ttl(){
 //        消息后处理对象,设置一些消息的参数信息
 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
   @Override
   public Message postProcessMessage(Message message) throws AmqpException {
     //                设置message的信息
     message.getMessageProperties().setExpiration("10000");
     //                返回该消息
     return message;
   }
 };
 //        如果同时设置了队列过期时间和消息单独过期时间,以时间短的为准。如下列情况是到十秒同时过期
 //        队列过期会将队列内的消息全部移除
 //        消息过期后,只有消息在队列的顶端才会判断其是否过期,过期则移除
 //        队列过期
 rabbitTemplate.convertAndSend("exchange_ttl","q.aa","ttl");
 //        消息单独过期
 rabbitTemplate.convertAndSend("exchange_ttl","q.aa","ttl",messagePostProcessor);
}
消息过期是指:发送的这一条消息会被设置过期时间,正常发送的消息则不会过期。
dlx死信 造成死信的原因:
  • 过期时间

  • //.ttl(10000)    
    public void one_dlx(){
      rabbitTemplate.convertAndSend("test_exchange_dlx","test_dlx.xx","one_del....");
    }
    
  • 长度限制

    • //.maxLength(10)	设置最大为10条消息
      public void two_dlx(){
        for (int i = 0; i < 20; i++) {
          rabbitTemplate.convertAndSend("test_exchange_dlx","test_dlx.xx","two_del....");
        }
      }
      
  • 消息拒收

    • public void three_dlx(Message message, Channel channel){
        System.out.println(new String(message.getBody()));
        try {
          channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      
死信配置
//    1:声明正常队列和交换机进行绑定
@Bean("test_queue_dlx")
public Queue test_queue_dlx(){
 return QueueBuilder.durable("test_queue_dlx").deadLetterExchange("exchange_dlx").deadLetterRoutingKey("dlx.suibian").ttl(10000).maxLength(10).build();
}
@Bean("test_exchange_dlx")
public Exchange test_exchange_dlx(){
 return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();
}
@Bean
public Binding test_binding_dlx(@Qualifier("test_queue_dlx")Queue queue,@Qualifier("test_exchange_dlx")Exchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("test_dlx.#").noargs();
}
//    2.声明死信队列和交换机进行绑定
@Bean("queue_dlx")
public Queue queue_dlx(){
 return QueueBuilder.durable("queue_dlx").build();
}
@Bean("exchange_dlx")
public Exchange exchange_dlx(){
 return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();
}
@Bean
public Binding binding_dlx(@Qualifier("queue_dlx")Queue queue,@Qualifier("exchange_dlx")Exchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
}
死信队列可以理解为是被绑定正常队列的回收站,死信队列和正常队列一样,一样可以消费其中的消息
ttl+dlx延迟队列 配置
@Bean("test_queue_ttl_dlx")
public Queue test_queue_ttl_dlx(){
 return QueueBuilder.durable("test_queue_ttl_dlx").deadLetterExchange("exchange_ttl_dlx").deadLetterRoutingKey("ttl_dlx.suibian").ttl(10000).build();
}
@Bean("test_exchange_ttl_dlx")
public Exchange test_exchange_ttl_dlx(){
 return ExchangeBuilder.topicExchange("test_exchange_ttl_dlx").durable(true).build();
}
@Bean
public Binding test_binding_ttl_dlx(@Qualifier("test_queue_ttl_dlx")Queue queue,@Qualifier("test_exchange_ttl_dlx")Exchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("test_ttl_dlx.#").noargs();
}

@Bean("queue_ttl_dlx")
public Queue queue_ttl_ttl_dlx(){
 return QueueBuilder.durable("queue_ttl_dlx").build();
}
@Bean("exchange_ttl_dlx")
public Exchange exchange_ttl_dlx(){
 return ExchangeBuilder.topicExchange("exchange_ttl_dlx").durable(true).build();
}
@Bean
public Binding binding_ttl_dlx(@Qualifier("queue_ttl_dlx")Queue queue,@Qualifier("exchange_ttl_dlx")Exchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("ttl_dlx.#").noargs();
}
生产者
@GetMapping("/ttl_dlx")
@ResponseBody
public void ttl_dlx(){
 for (int i = 0; i < 10; i++) {
   rabbitTemplate.convertAndSend("test_exchange_ttl_dlx","test_ttl_dlx.xx","ttl_dlx...");
 }
}
消费者
@RabbitListener(queues = "queue_ttl_dlx")
public void ttl_dlx_li(Message message,Channel channel){
 System.out.println(new String(message.getBody()));
 try {
   channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
 } catch (IOException e) {
   e.printStackTrace();
 }
}
延迟队列:设置队列为过期队列,当改消息过期后到绑定的死信队列里面,从而实现消息的延迟
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/866008.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号