栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitMQ

rabbitMQ

rabbitMQ
MQ message queue 消息队列
MQ三大功能 流量消峰 应用解耦 异步处理
MQ的选择
kafka 追求的是高吞吐量,有日志采集功能肯定首选,大数据量的数据收集业务.
RocketMQ 追求的是可靠性,金融互联网领域首选 用于对可靠性要求很高的场景
RabbitMQ 追求的是功能完备,使用方便,社区活跃 适合中小公司使用

RabbitMQ负责接收,存储,转发消息
四大核心概念
producer生产者
consumer消费者
exchange交互机 只负责转发不负责存储
queue队列 存储消息

一个connection里可以有多个channel

windowns版rabbitmq的安装
官网:Erlang otp下载官网:https://www.erlang.org/downloads
rebibitmq下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/

百度网盘版链接:https://pan.baidu.com/s/1cYvOp9YCqqnzDrqUEk8eVw
提取码:dj1f
下载好以后,先安装 otp ,再安装 rabbitmq
Windows RabbitMQ环境变量配置
1、erlang安装后,默认已经配置环境变量:ERLANG_HOME,如果没有,请自行配置
环境变量名:ERLANG_HOME
值:D:softerl-23.2(安装目录)
2、RabbitMQ环境变量配置
增加RabbitMQ安装目录的变量配置
环境变量名:RABBITMQ_SERVER
值:D:softRabbitMQ Serverrabbitmq_server-3.8.14(安装目录)
3、path变量增加:
%RABBITMQ_SERVER%sbin;

然后到 rabbitmq的sbin目录 执行 rabbitmq-plugins.bat enable rabbitmq_management
http://localhost:15672 管理页面用户名密码都是 guest

uname -a 查看当前linux系统的版本

创建队列时有几个参数:
durable 队列持久化 服务器重启后队列还能重新服务,但仅仅是队列. 消息的持久化还需在发布消息的时候明确(rabbitmqTemplate默认发送消息持久化的)
exclusive 队列是否只能被创建它的连接使用
autoDelete 自动删除,是否当没有消费者监听它时将队列删除

work queues 工作模式 一个生产者生产大量消息,多个消费者来消费,每条消息只能消费一次,多个消费者之间是竞争的关系

为了防止消息丢失,引入了消息应答机制,rabbitmq在接收到消费者的应答才会删除消息
消息应答有自动应答和手动应答
自动应答是消费者接收到消息就直接应答,处理过程中可能会出现问题,造成消息丢失.
手动应答又分三种 channel.basicAck() 用于肯定确认 channel.basicNack() 用于否定确认 channel.basicReject() 不处理该消息,直接丢弃

批量处理应选false,只确认当前的消息,可保证安全消费.批量应答在接收当前消息的应答时将信道中的其它消息一起应答了,其它消息如果消费失败会造成数据丢失

超过规定时间未应答的消息,rabbitmq会自动重新入队.但是万一消息已经被消费,只是未应答,那么重新入队将会造成重复消费,所以在服务端这里要做消息消费的幂等处理

多个消费者消费消息默认是轮询消费,但是这样的效率并不高,应该根据消费者的处理能力来消费.这就需要在消费者处设置它的消费方式
channel.basicQos(1); 消费者消费预取值,默认是0 无限制,所以默认的分发方式就是轮询 设置为1,则最多同时处理一个,有多个则分配给其他的消费者 一般设置多个

一个队列里的消息只能消费一次,但是如果想要一条数据被多次消费,那么可以用交换机发布到多个队列里

系统耦合性越高,容错性就越低,可维护性就越低

交换机的类型 direct路由 fanout广播 topics通配符
如果交换机类型是fauout routingKey是空串代表分发给与它绑定的所有队列
如果交换机类型是direct 那么是路由模式 队列和交换机的不同rountingKey都需要绑定 并且生产消息的时候就需要告知交换机rountingKey
topics通配符模式,可以实现路由和广播模式 功能强大 通配符*代表一个单词 #代表0-多个单词

spring整合rabbitmq rabbitMqTemplate 来发消息 队列 交换机 监听器 连接工厂都定义在配置文件中 消费者实现 MessageListener接口即可,一个队列至少一个监听器(监听类来监听)

spring整合rabbitmq配置文件如下: 注意模板类也是rabbit: 开头的

 
    

    

    

    
    
        
    
	

springboot的监听者直接导入依赖,配好ip 端口 账号 密码 虚拟主机 这些信息后如下代码即可

	@Component
public class Consumer {

    @RabbitListener(queues = "hello")
    public void consumer(Message msg){
        System.out.println(new String(msg.getBody()));
    }
}

生产者直接创建一个配置类,在配置类里注入 交换机 队列 绑定 对象 代码如下:

@Configuration
public class RabbitMqConfig {
    public static final String BOOT_QUEUE="boot_queue";
    public static final String BOOT_DIRECT_EXCHANGE="boot_direct_exchange";
    @Bean(BOOT_QUEUE)
    public Queue queue(){
       return QueueBuilder.durable(BOOT_QUEUE).build();
    }

    @Bean(BOOT_DIRECT_EXCHANGE)
    public Exchange exchange(){
        return ExchangeBuilder.directExchange(BOOT_DIRECT_EXCHANGE).durable(true).build();
    }

    @Bean
    public Binding binding(@Qualifier(BOOT_QUEUE)Queue queue,@Qualifier(BOOT_DIRECT_EXCHANGE)Exchange exchange){
       return BindingBuilder.bind(queue).to(exchange).with("error").noargs();
    }

rabbitMQ的高级特性

1.发布确认 publish-confirm 从生产者到交换机会 产生/confirm/iCallback
需要在配置文件 连接工厂设置中 增加开启发布确认 publisher-confirms=“true”
代码中在发消息前进行发布确认回调

RabbitTemplate.ConfirmCallback confirmCallback= ( correlationData,  ack,  cause)
        ->{
            if (ack){
                //成功
                System.out.println(JSON.toJSonString(correlationData));
            }else{
                //失败
                System.out.println("失败原因:"+cause+",并进行处理");
            }
        };
        rabbitTemplate.setConfirmCallback(/confirm/iCallback);

2.投递确认 publish-return 从交换机到队列投递失败会产生ReturnCallback
需要在配置文件 连接工厂设置中 增加开启发布确认 publisher-return=“true”
代码中在发消息前进行投递确认回调

 rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("发送失败message:"+message);
                System.out.println("发送失败replyCode:"+replyCode);
                System.out.println("发送失败replyText:"+replyText);
                System.out.println("发送失败exchange:"+exchange);
                System.out.println("发送失败routingKey:"+routingKey);
            }
        });

发送失败message:(Body:’[B@5afa03de(byte[5])’ MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
发送失败replyCode:312
发送失败replyText:NO_ROUTE
发送失败exchange:exchange_direct_spring
发送失败routingKey:111

3.消费者消息确认 ack
配置中设置手动ack acknowledge=“manual”

  
        
    

代码如下:

public class Consumer implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        try {
            System.out.println(new String(message.getBody()) );
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
        }
    }

4.消费端限流 首先保证ack是手动的,然后消费者prefetch这个值设置个具体数量.那么rabbitmq会始终保持给消费者的信道上最多设置的消息数,但是不设置的话就会一直给消费者发送,堆积在信道上

  
        
    

5.消息过期时间 队列设置时间和消息设置时间可以同时存在,以短的时间为准
整个队列设置属性 x-message-ttl 单位是毫秒 那么进入队列的每个消息都只能存活设定的时间


        
            
        
    

单个消息设置过期时间需在发消息的代码中设置

 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setExpiration("2000");
                    return message;
                }
            };
            rabbitTemplate.convertAndSend("hello_queue",(i+"hello").getBytes(),messagePostProcessor);

6.死信交换机
消息变成死信的几种情况:1.消息过期 2.消息被拒绝或Nack并且没有重新放回队列 3.超过队列长度或容量被丢弃
死信交换机和死信队列与正常的交换机和队列没甚区别
队列配置死信交换机和死信routing-key(发送死信时的具体routing-key)就可以在信息变成死信后发送给死信交换机


           
           
        

7.ttl+死信队列可以实现延迟队列的效果 订单30分钟未支付则取消订单恢复库存,用户注册7天后发送问候短信等都可以使用延迟队列

消息可靠性总结
1.生产者确认:发布确认和投递确认
2.消费者确认ack 手动ack
3.交换机 队列 消息持久化(rabbitmqTemplate发送的消息默认就是持久化的)
4.rabbitMq高可用 集群 RabbitMQ镜像集群配置 HAProxy进行负载均衡

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780444.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号