栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

分布式-MQ-02 RabiitMQ高级特性及整合Spring、SpringBoot

分布式-MQ-02 RabiitMQ高级特性及整合Spring、SpringBoot

RabiitMQ高级特性及整合Spring、SpringBoot

1 Spring、Springboot整合RabbitMQ2 RabbitMQ高级特性

2.1 消息可靠性

2.1.1 生产者的消息确认机制2.1.2 消费者的消息确认签收机制2.1.3 持久化数据2.1.4 Broker高可用 2.2 消峰限流2.3 TTL(Time to Live)2.4 死信队列(Dead letter Queue/Exchange)2.5 延迟队列2.6 防止重复消费 3 集群搭建4 haproxy搭建

1 Spring、Springboot整合RabbitMQ

源码:https://gitee.com/asky-cloud-service/Daily_study.git

2 RabbitMQ高级特性 2.1 消息可靠性

消息确认机制在生产者及消费者端均有相应的机制。

2.1.1 生产者的消息确认机制

前提:需要在生产者端开启channel通道时,设置需要/confirm/i消息确认机制。
其次:设置消息投递失败的取舍机制

//开启生产者端消息确认机制
channel./confirm/iSelect();   //当时用spring或springBoot时,需要在ConnectionFactory开启
publisher-/confirm/is="true"  //开启 确认模式
rabbitTemplate.setConfirmCallback //设置回调函数。当消息发送到exchange后回调/confirm/i方法。
//在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

publisher-returns="true"    //开启 退回模式
rabbitTemplate.setReturnCallback //设置退回函数,当消息从exchange路由到queue失败后,
rabbitTemplate.setMandatory(true) //参数,则会将消息退回给producer。
//并执行回调函数returnedMessage。
    
//第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。
channel.basicPublish(RabbitConstant.EXCHANGE_TOPIC_EXCEPT,key,true,null,input.getBytes(StandardCharsets.UTF_8));

/confirm/i确认机制
/confirm/i确认机制,是确认消息发送到Exchange时,消息被接收或被拒绝(限流、异常IO等)。需要添加如下/confirm/i监听器:

        channel.add/confirm/iListener(new /confirm/iListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("[交换机接收消息id]:"+deliveryTag);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                //交换机拒绝的条件:
                //限流、队列已满、io异常等
                System.err.println("[交换机拒绝]:"+deliveryTag + "mul?"+ multiple);
            }
        });

return退回机制

return机制,是指在交换机Exchange将消息投递给Queue队列时,如果没有Queue接收,则消息将被退回。需要添加类似如下return监听器:

        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return r) {
                System.err.println("===========================");
                System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
                System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
                System.err.println("Return主题:" + new String(r.getBody()));
                System.err.println("===========================");
            }
        });
2.1.2 消费者的消息确认签收机制

consumer ack
消息由队列发送至消费者时,消费者如果需要开启签收机制,

首先需要将消息的ack确认机制设置为手动: acknowledge=manual


    

使用spring配置文件时,有3种可选ack配置

自动确认:acknowledge=“none”手动确认:acknowledge=“manual”根据异常情况确认:acknowledge=“auto”

其次签收消失时,逐个签收,不可将此前队列全部接收

channel.basicAck(deliveryTag,true);
2.1.3 持久化数据

在管理台设置交换机、队列时均需要考虑是否需要持久化消费端在channel接收数据时也可以设置将数据持久化 2.1.4 Broker高可用

搭设集群服务,保证在有MQ机器宕机时,集群管理者可以选择可用的机器给业务使用。

2.2 消峰限流

MQ的限流能力,可以设置每次从MQ中拉取的消息prefetch数,保证服务器需要瞬时处理的最大活跃消息个数。当有正在处理的消息结束后,prefetch的阈值不变,可以从MQ拉取的消息数+1,则继续拉取消息。

2.3 TTL(Time to Live)

指定消息队列的存活时间,假设设置消息在10min内被消息则消息可用,超出时间队列中的该消息将被清除。

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。 2.4 死信队列(Dead letter Queue/Exchange)

从上游交换机投递至队列时,该队列拒绝当前消息,存在消息成为死信的可能,此时如果该队列绑定了死信交换机,则此时会将死信消息投递至死信交换机。

前提:如果队列需要设置死信交换机,需要设置参数:

x-dead-letter-exchangex-dead-letter-routing-key

消息成为死信的可能原因:

队列消息长度到达限制消费者拒接消费消息,并且不重回队列原队列存在消息过期设置,消息到达超时时间未被消费

2.5 延迟队列

通过TTL+死信队列,可以过滤出没有被消费的队列信息。从而达到延迟队列的功能

2.6 防止重复消费

业务侧(消费者)需要保证接口的幂等性。

乐观锁实现接口/消息幂等。

3 集群搭建 4 haproxy搭建

考虑RabbitMQ在实际生产生活的使用没有kafka和RocketMQ多,将在后续搭建kafka、rocketMQ集群,rabbitMQ不再搭建。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752111.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号