栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq发布确认和回退

rabbitmq发布确认和回退

这里以direct类型为例
application.yml

server:
  port: 8000
  #session-timeout: 1800

spring:
  # RabbitMQ 配置项,对应 RabbitProperties 配置类
  rabbitmq:   
    host: 127.0.0.1 # RabbitMQ 服务的地址
    port: 5672 # RabbitMQ 服务的端口
    username: guest # RabbitMQ 服务的账号
    password: guest # RabbitMQ 服务的密码
    publisher-returns: true              # 发送者开启 return 确认机制
    publisher-/confirm/i-type: correlated   # 发送者开启 confirm 确认机制

pom.xml:
 



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.3.7.RELEASE
         
    
    com.rabbitmq
    /confirm/i
    0.0.1-SNAPSHOT
    /confirm/i
    Demo project for Spring Boot
    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        


         
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            junit
            junit
        
        
            org.projectlombok
            lombok
            true
        
        
            org.springframework.boot
            spring-boot-starter-web
            2.3.7.RELEASE
            compile
        

    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


常量类:
 

public interface RabbitmqConstant {

       String QUEUE_NAME = "QUEUE_DEMO_DIRECT";

       String EXCHANGE_NAME = "direct_exchange";

       String ROUTING_KEY = "ROUTING_KEY_01"; 

}


config类: 绑定队列,交换机和路由key

@Configuration
public class RabbitmqDirectConfig {

    @Bean("bootDirectExchange")
    public Exchange bootDirectExchange() {
        return 
 ExchangeBuilder.directExchange(RabbitmqConstant.EXCHANGE_NAME).durable(true).build();
    }

    @Bean("bootDirectQueue")
    public Queue bootDirectQueue() {
        return QueueBuilder.durable(RabbitmqConstant.QUEUE_NAME).build();
    }

    @Bean
    public Binding bindDirectQueueExchange(@Qualifier("bootDirectQueue") Queue queue, 
     @Qualifier("bootDirectExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitmqConstant.ROUTING_KEY).noargs();
    }
}

监听类:当该列队有消息时自动消费

@Slf4j
@Component
public class RabbitmqListener {

    @RabbitListener(queues = RabbitmqConstant.QUEUE_NAME)
    public void ListenerQueue01(Message message, Channel channel) {

        System.out.println("mess====" + message.toString());
        log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    } 
}

消息回调类:注意这一层是交换机有没有收到消息的回调

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}

回退类: 注意这里是当不可到达目的地的时候才回调,例如路由key错了,无法把消息传递给队列

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {


    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("到了回退方法");
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}


controller类:

@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class ConfirmController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService /confirm/iCallbackService;
    @Autowired
    private ReturnCallbackService returnCallbackService;

    @GetMapping("/send/confirm/i/{msg}")
    public void sendConfirmMessage(@PathVariable("msg")String msg){
        
        CorrelationData correlationData = new CorrelationData("1");
         rabbitTemplate.setConfirmCallback(/confirm/iCallbackService);
        SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String mess="boot mq hello Direct"+format.format(new Date());
        rabbitTemplate.convertAndSend(RabbitmqConstant.EXCHANGE_NAME, RabbitmqConstant.ROUTING_KEY,
                mess);
        log.info("发送信息为:" + msg);
    }

    @GetMapping("/sendReturn/{msg}")
    public void sendReturnMessage(@PathVariable("msg")String msg){
        
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(returnCallbackService);
        SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String mess="boot mq hello Direct"+format.format(new Date());
        rabbitTemplate.convertAndSend(RabbitmqConstant.EXCHANGE_NAME, RabbitmqConstant.ROUTING_KEY+"222",
                mess);
        log.info("发送信息为:" + msg);
    }

}

还可以把消息确认类和回退类写一起:如下

@Component
@Slf4j
public class MyCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }

    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("到了回退方法");
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

controller类:

@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class ConfirmController {

    @Resource
    private RabbitTemplate rabbitTemplate;

 

    @GetMapping("/send/confirm/i/{msg}")
    public void sendConfirmMessage(@PathVariable("msg")String msg){
        
        CorrelationData correlationData = new CorrelationData("1");      
        SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String mess="boot mq hello Direct"+format.format(new Date());
        rabbitTemplate.convertAndSend(RabbitmqConstant.EXCHANGE_NAME, 
           RabbitmqConstant.ROUTING_KEY,
                mess);
        log.info("发送信息为:" + msg);
    }

    @GetMapping("/sendReturn/{msg}")
    public void sendReturnMessage(@PathVariable("msg")String msg){
        
        CorrelationData correlationData = new CorrelationData("1");    
        SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String mess="boot mq hello Direct"+format.format(new Date());
        rabbitTemplate.convertAndSend(RabbitmqConstant.EXCHANGE_NAME, 
          RabbitmqConstant.ROUTING_KEY+"222",
                mess);
        log.info("发送信息为:" + msg);
    }

}

浏览器执行:http://127.0.0.1:8000//confirm/i/sendReturn/heheheeh222
                    http://127.0.0.1:8080//confirm/i/send/confirm/i/heheheeh

注意:在springboot的测试类中不会执行回调和回退方法,要以这种接口请求方式
如果 在测试类就用内部类吧:
 

           rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
                System.out.println("correlationData--->" + correlation);
              System.out.println(ack);
              if (ack) {
                  System.out.println("正常投递回复...");
                  //后续执行其他业务...
               } else {
                  System.out.println("投递异常....");
                  //后续记录等操作...
               }
            });

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583280.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号