栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ--发布确认高级部分

RabbitMQ--发布确认高级部分

发布/确认高级部分

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:

确认机制的方案

实现

配置

代码

package com.uin.rabbitmqspringboot.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ConfirmConfig {

    public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
    public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
    public static final String /confirm/i_ROUTING_KEY = "key1";

    //声明交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(/confirm/i_EXCHANGE_NAME);
    }
    //声明队列

    @Bean
    public Queue queue() {
        return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
    }

    //绑定
    @Bean
    public Binding binding(@Qualifier("queue") Queue queue,
                           @Qualifier("directExchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(/confirm/i_ROUTING_KEY);
    }
}
package com.uin.rabbitmqspringboot.controller;

import com.uin.rabbitmqspringboot.config./confirm/iConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("//confirm/i")
@Slf4j
public class Producer_ConfirmController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/sendMessage/{message}", method = RequestMethod.GET)
    public void sendMessage(@PathVariable("message") String message) {

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("1");

        rabbitTemplate.convertAndSend(/confirm/iConfig./confirm/i_EXCHANGE_NAME+"123",
                /confirm/iConfig./confirm/i_ROUTING_KEY, message,correlationData);
        log.info("发送消息内容为:{}", message);
    }

}
package com.uin.rabbitmqspringboot.listener;

import com.rabbitmq.client.Channel;
import com.uin.rabbitmqspringboot.config./confirm/iConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;




@Component
@Slf4j
public class ConfirmConsumer {

    @RabbitListener(queues = /confirm/iConfig./confirm/i_QUEUE_NAME)
    public void received(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("确认的消息:{}", msg);
    }
}

消息到达指定的交换机的确认回调

package com.uin.rabbitmqspringboot.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    //将自己写的接口实现注入到RabbitTemplate.ConfirmCallback
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //注入
    @PostConstruct
    public void invoke() {
        rabbitTemplate.setConfirmCallback(this);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机接受到的消息,id:" + id);
        } else {
            log.info("交换机没有接受到id:{}消息!原因为:{}", id, cause);
        }
    }
}

测试

交换机和信道都没有问题的测试。

这个是交换机出现问题,如果是信道出问题呢。

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。

回退消息

通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

配置

代码

package com.uin.rabbitmqspringboot.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Component
@Slf4j
public class MyCallBack implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback {

    //将自己写的接口实现注入到RabbitTemplate.ConfirmCallback
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //注入
    @PostConstruct
    public void invoke() {
        rabbitTemplate.setConfirmCallback(this::/confirm/i);
        rabbitTemplate.setReturnsCallback(this::returnedMessage);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机接受到的消息,id:{}", id);
        } else {
            log.info("交换机没有接受到id:{}消息!原因为:{}", id, cause);
        }
    }

    
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息{},被交换机{}退回,退回的原因是{},路由key是{}",
                new String(returnedMessage.getMessage().getBody()),
                returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
    }


}

测试

消息回退成功。

备份交换机

概念

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。

但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。

而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?

备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。

当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

实战

配置类

package com.uin.rabbitmqspringboot.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ConfirmConfig {


    //交换机
    public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    //队列
    public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARN_QUEUE_NAME = "warning.queue";
    public static final String /confirm/i_ROUTING_KEY = "key1";

    //声明交换机
    @Bean
    public DirectExchange directExchange() {

        return ExchangeBuilder.directExchange(/confirm/i_EXCHANGE_NAME)
                .durable(true)
                //绑定主交换机和备份交换机
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    //声明队列
    @Bean
    public Queue /confirm/i_queue() {
        return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
    }

    @Bean
    public Queue backup_queue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    @Bean
    public Queue warning_queue() {
        return QueueBuilder.durable(WARN_QUEUE_NAME).build();
    }


    //绑定
    @Bean
    public Binding confirmBinding(@Qualifier("directExchange") DirectExchange directExchange,
                                  @Qualifier("/confirm/i_queue") Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with("key1");
    }

    //备份交换机和备份队列绑定
    @Bean
    public Binding /confirm/i_backup_exchange(@Qualifier("backup_queue") Queue queue,
                                           @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    //备份交换机和报警队列绑定
    @Bean
    public Binding /confirm/i_backup_waring_exchange(@Qualifier("warning_queue") Queue queue,
                                                  @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}
package com.uin.rabbitmqspringboot.listener;

import com.rabbitmq.client.Channel;
import com.uin.rabbitmqspringboot.config./confirm/iConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class Waring_Consumer {

    @RabbitListener(queues = /confirm/iConfig.WARN_QUEUE_NAME)
    public void waring(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.error("报警!发现不可路由的消息:{}", msg);
    }
}

测试

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746905.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号