栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ简单入门

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ简单入门

一、关于MQ的介绍

消息队列MQ是一种应用之间异步通信的方式,主要是由三个部分组成:第一个是生产者(producer),生产者也就是说生产消息的这一端,然后主要是负责消息所承载业务信息的一个实例化,是我们整个消息的发起方;第二个是Broker,它是我们整个消息的服务端,主要是去处理我们这个消息单元,它负责消息的存储、投递以及队列的一些其他附加功能的实现,它是整个消息队列里面最核心的组成部分;第三个消费者(consumer),主要负责消息的消费,具体是根据消息所承载的一个信息去处理各种业务逻辑。市面上几款比较火爆的MQ有:ActiveMQ,RocketMQ,Kafka,RabbitMQ。

RabbitMQ的完整架构图:

二、确保消息发送可靠性

大家知道,RabbitMQ中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息Roueting(路由)到不同的 Queue(队列)中,再由不同的消费者去消费。

大致的流程就是这样,所以要确保消息发送的可靠性,主要从两方面去确认:

1. 消息成功到达 Exchange

2. 消息成功到达 Queue

如果能确认这两步,那么我们就可以认为消息发送成功了。 如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。

经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:

1. 确认消息到达 Exchange

2. 确认消息到达 Queue

3. 开启定时任务,定时投递那些发送失败的消息

上面提出的三个步骤,第三步需要我们根据业务逻辑实现,前两步RabbitMQ则给出了现成的解决方案。

那么,该如何确保消息成功到达RabbitMQ?RabbitMQ有两种解决方案:

1.开启事务机制

2.发送方确认机制

注意:这是两种不同的方案,不可以同时开启,只能选择其中之一。

2.1开启事务模式

Spring Boot 中开启 RabbitMQ 事务机制的方式如下: 首先需要先提供一个事务管理器,如下:

@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory)
{
return new RabbitTransactionManager(connectionFactory);
}

接下来,在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:

@Service
public class MsgService {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Transactional
    public void send() {
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JUMPING_QUEUE_NAME,"hello rabbitmq!".getBytes());
        int i = 1 / 0;
    }
}

这里注意两点:

1. 发送消息的方法上添加 @Transactional 注解标记事务

2. 调用 setChannelTransacted 方法设置为 true 开启事务模式

这就 OK 了。 在上面的案例中,我们在结尾来了个 1/0 ,这在运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功。

当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:

1. 客户端发出请求,将信道设置为事务模式

2. 服务端给出回复,同意将信道设置为事务模式

3. 客户端发送消息

4. 客户端提交事务

5. 服务端给出响应,确认事务提交

上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模 式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说 都是一些高并发的项目,这个时候并发性能尤为重要。 所以,RabbitMQ还提供了发送方确认机制(publisher /confirm/i)来确保消息发送成功,这种方式,性能要远远高于事务模式。

 2.2发送方确认机制

发送方确认机制需要在application.properties中配置两行代码:

spring.rabbitmq.publisher-/confirm/i-type=correlated //配置消息到达交换器的确认回调

spring.rabbitmq.publisher-returns=true //配置消息到达队列的回调

接下来我们要开启监听:

@Configuration
public class RabbitConfig implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback {
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    public final static String MY_EXCHANGE_NAME = "my_exchange_name";
    public final static String MY_QUEUE_NAME = "my_queue_name";

    @Autowired
    RabbitTemplate rabbitTemplate;

    
    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(myQueue())
                .to(directExchange())
                .with(MY_QUEUE_NAME);
    }

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(MY_EXCHANGE_NAME, true, false);
    }

    @Bean
    Queue myQueue() {
        return new Queue(MY_QUEUE_NAME, true, false, false);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息到达交换机了,消息 id:{}", correlationData.getId());
        } else {
            logger.error("消息未到达交换机,消息 id:{}", correlationData.getId());
        }
    }

    
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        logger.error("消息未到达队列:{}", returned.toString());
    }

关于这个配置类,我说如下几点:

1. 定义配置类,实现RabbitTemplate./confirm/iCallback和RabbitTemplate.ReturnsCallback两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用

2. 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为rabbitTemplate分别 配置这两个Callback。

这就可以了。


 三、失败重试

假设现在有两种情况,第一种就是MQ宕机导致失败,第二种是消息并未到达交换机而导致的失败。

3.1自带重试机制

SpringBoot中自带相应的重试机制(retry),主要针对的是服务压根找不到MQ导致失败重试。只需要在配置文件中引入几行简单的配置:

从上往下配置含义依次是:

1.开启重试机制

2.重试起始间隔时间

3.最大重试次数

4.最大重试间隔时间

5.间隔时间乘数(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)

测试:开启工程,关掉RabbitMQ,此时尝试发送消息,就会发送失败,进而自动重试。

3.2业务重试 

业务重试主要是针对消息没有到达交换器的情况。

整体思路是这样:

1. 首先创建一张表,用来记录发送到中间件上的消息。每次发送消息的时候,就往数据库中添加一条记录。表的结构在微服务之RabbitMQ消息发送失败重试(上)的结尾。

下面三个字段额外解释一下:

status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发 送失败。

tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成 功,此时就可以开始重试了)

count:表示消息重试次数

2. 在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后

3. 在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消 息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。 4. 另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次, 如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重 试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。

大致的思路就是上面这样,当然,大家也要注意,消息是否要确保 100% 发送成功,也要看具体情况。


四、确保消费可靠性

消息确认分为自动确认和手动确认,我们分别来看。

4.1自动确认

先来看看自动确认,在 Spring Boot 中,默认情况下,消息消费就是自动确认的。 我们来看如下一个消息消费方法:

@Component
public class ConsumerDemo {
    @RabbitListener(queues = RabbitConfig.JUMPING_QUEUE_NAME)
    public void handle2(String msg) {
    System.out.println("msg = " + msg);
    int i = 1 / 0;
    }
}

通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消 息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费 的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。

4.2手动确认

要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

 这个配置表示将消息的确认模式改为手动确认。

接下来我们来看下消费者中的代码:

@RabbitListener(queues = RabbitConfig.JUMPING_QUEUE_NAME)
public void handle3(Message message,Channel channel) {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        //消息消费的代码写到这里
        String s = new String(message.getBody());
        System.out.println("s = " + s);
        //消费完成后,手动 ack
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        //手动 nack
        try {
            channel.basicNack(deliveryTag, false, true);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

将消费者要做的事情放到一个try..catch代码块中。 如果消息正常消费成功,则执行basicAck完成确认。 如果消息消费失败,则执行 basicNack 方法,告诉RabbitMQ消息消费失败。 这里涉及到两个方法: basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id; 第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示 消息的 id;第二个参数 multiple 如果为false,表示仅拒绝当前消息的消费,如果为 true,则表 示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数requeue含义和前面所说的一 样,被拒绝的消息是否重新入队。

下来有一个关于幂等性的问题,详细的在使用场景中有介绍:微服务之RabbitMQ消息发送幂等性问题(下)


 五、设置消息有效期

默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。

5.1TTL

在这里引入一个新的东西,叫做TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间, 那么我们可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了TTL并且还没有被消息,此时消息就会变成死信。

TTL 的设置有两种不同的方式: 1. 在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。 2. 在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。

简而言之,就是给单条消息或者给整个队列设置过期时间,根据具体的业务需求来定义。那么我们来看看给单条消息设置过期时间。需要注意的是,两种情况都设置了,以时间短的为准。

这里的内容都挺容易的,我就直接上代码吧。

 5.2死信队列

什么是死信队列呢,绑定了死信交换机的队列就叫做死信队列。死信交换机Dead-Letter-Exchange 即DLX,即用来接收死信消息(Dead Message)的,一般消息变成死信消息有 如下几种情况:

1.消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false

2.消息过期

3.队列达到最大长度

当消息在一个队列中变成了死信消息后,此时就会被发送到DLX,绑定DLX的消息队列则称为死信队列。

关于死信队列的配置:

 然后,为消息队列配置死信交换机。

这就配置好了。 将来发送到这个消息队列上的消息,如果发生了nack、reject或者过期等问题,就会被发送到DLX上,进而进入到与DLX绑定的消息队列上。

最后死信消息的消费和普通队列的消费一致,@RabbitListener注解开启队列监听即可

 


六、延迟消息队列

延迟消息队列的实现方式很多,我们先聊聊它的作用。

定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨3点去备份,这种固定时间的 定时任务我们一般采用cron表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中 的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中 有的时候也会遇到类似的需求,例如:

在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进 入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。

我买了一个智能砂锅,可以用来煮粥,上班前把素材都放到锅里,然后设置几点几分开始煮粥,这样下班后就可以喝到香喷喷的粥了,那么这个煮粥的指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了再执行。

公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。

用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。

看到这里 ,大家是不是觉得延迟队列很好用,我也是这样认为的,那么如何定义一个延迟队列,这里我要用的是DLX实现延迟队列。

DLX延迟队列实现的思路也很简单,就是我们所说的 DLX(死信交换机)+TTL(消息超时时间)。我们可以把死信队列就当成延迟队列。具体来说:

假如一条消息需要延迟30分钟执行,我们就设置这条消息的有效期为30分钟,同时为这条消息配置死信交换机和死信队列,并且不为这个消息队列设置消费者,那么30分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

那么我们该如何做呢?

首先我们先定义一个普通队列和一个死信队列。

@Configuration
public class RabbitConfig {
    public static final String MY_EXCHANGE = "my_exchange";
    public static final String MY_QUEUE = "my_queue";
    //定义死信交换机
    public static final String DLX_EXCHANGE = "dlx_exchange";
    //定义死信队列
    public static final String DLX_QUEUE = "dlx_queue";

    @Bean
    Queue dlxQueue(){
        return new Queue(DLX_QUEUE,true,false,false);
    }

    @Bean
    DirectExchange dlxExchange(){
        return new DirectExchange(DLX_EXCHANGE,true,false);
    }

    @Bean
    Binding dlxBinding(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_QUEUE);
    }

    @Bean
    Queue myQueue(){
        //设置队列过期时间
        Map args = new HashMap<>();
        //设置队列过期时间为 30min
        args.put("x-message-ttl",1000*60*30);
        args.put("x-dead-letter-exchange",DLX_EXCHANGE);
        args.put("x-dead-letter-routing-key",DLX_QUEUE);
        return new Queue(MY_QUEUE,true,false,false,args);
    }
    @Bean
    DirectExchange directExchange(){
        return new DirectExchange(MY_EXCHANGE,true,false);
    }
    @Bean
    Binding binding(){
        return BindingBuilder.bind(myQueue())
                .to(directExchange())
                .with(MY_QUEUE);
    }
}

然后为这个死信队列配置一个消费者负责消费到达死信队列的消息,收到消息之后打印出来。

 接着我们便可以在单元测试发送消息。

 这里偷懒使用了消息有效期的代码,不影响测试结果。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781751.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号