栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ

RabbitMQ

死信交换机一、队列延迟

@Bean("queueA")
public Queue queueA(){
    Map arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange",Y_EXCHANGE);
    arguments.put("x-dead-letter-routing-key","YD");
    arguments.put("x-message-ttl",10*1000);
    return QueueBuilder.nonDurable(QUEUE_A).withArguments(arguments).build();
}

死信交换机二、交换机设置延迟

    @RequestMapping("/sendExpMsg/{msg}/{time}")
    @ResponseBody
    public String sengdExpMsg(@PathVariable("msg")String msg,@PathVariable("time")String time ){
        rabbitTemplate.convertAndSend("X", "XC", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(time);
                return message;
            }
        });
        log.info("发送延迟消息成功,发送时间为{},发送内容为{},延迟时间为{}毫秒",new Date().toString(),msg,time);
        return "发送延迟消息成功!~~~~";
    }

延迟交换机delayedExchange配置类

@Configuration
public class DelayedMessageConfig {

    public static final String DELAYED_QUEUE_NAME="delayed.queue";
    public static final String DELAYED_EXCHANGE="delayed.exchange";
    public static final String DELAYED_ROUTINGKEY="delayed.routingkey";

    @Bean("queueDelayed")
    public Queue queueDelayed(){
        return QueueBuilder.nonDurable(DELAYED_QUEUE_NAME).build();
    }

    @Bean("exchangeDelayed")
    public CustomExchange exchangeDelayed(){
        Map arguments=new HashMap<>();
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",false,false,arguments);
    }

    @Bean
    public Binding qDBingdingExD(@Qualifier("queueDelayed")Queue qD,@Qualifier("exchangeDelayed")CustomExchange exD){
        return BindingBuilder.bind(qD).to(exD).with(DELAYED_ROUTINGKEY).noargs();
    }
}

延迟交换机delayedExchange发送延迟消息

    @RequestMapping("/sendDelayedMsg/{msg}/{time}")
    @ResponseBody
    public String sengdDelayedMsg(@PathVariable("msg")String msg,@PathVariable("time")Integer time ){
        rabbitTemplate.convertAndSend(DelayedMessageConfig.DELAYED_EXCHANGE, DelayedMessageConfig.DELAYED_ROUTINGKEY, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(time);
                return message;
            }
        });
        log.info("发送延迟消息成功,发送时间为{},发送内容为{},延迟时间为{}毫秒",new Date().toString(),msg,time);
        return "发送延迟消息成功!~~~~";
    }

备份交换机配置类

    @Bean("exchange/confirm/i")
    public DirectExchange exchangeConfirm(){
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).withArgument("alternate-exchange",BACKUP_EXCHANGE).build();
    }

确认回调接口及回退消息回调接口

@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnsCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData!=null?correlationData.getId():"";
        if(ack){
            log.info("接收到了id为{}的消息",id);
        }else {
            log.info("没有收到id为{}的消息",id);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("我是回退消息的回调接口,退回的消息是:{},交换机是{},routingkey是{},回退原因是{}",
                new String(returnedMessage.getMessage().getBody()),
                returnedMessage.getExchange(),returnedMessage.getRoutingKey()
                ,returnedMessage.getReplyText());
    }
}
回退回调接口及备份交换机同时存在,备份交换机优先

pom文件

    
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            com.alibaba
            fastjson
            1.2.47
        
        
            org.projectlombok
            lombok
            1.16.18
        
        
            org.springframework.amqp
            spring-rabbit-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

property文件

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-/confirm/i-type=correlated
spring.rabbitmq.publisher-returns=true

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673419.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号