栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ消息中间件技术精讲(三)

RabbitMQ消息中间件技术精讲(三)

文章目录

第三章 `RabbitMQ`高级特性

3.1-消息如何保障100%的投递成功方案3.2-幂等性概念及业界主流解决方案3.3-`/confirm/i`确认消息详解3.4-`Return`消息机制3.5-消费端自定义监听3.6-消费端的限流策略3.7-消费端`ACK`与重回队列机制3.8-`TTL`消息详解3.9-死信队列`DLX`

第三章 RabbitMQ高级特性 3.1-消息如何保障100%的投递成功方案

    生产端的可靠性投递

    保障消息的成功发出保障MQ节点的成功接收发送端收到MQ节点(Broker)确认应答完善的消息进行补偿机制

    BAT/TMD大厂解决方案

    消息落库,对消息状态进行打标(缺点,在高并发场景下对数据库压力过大)

      生产者将业务数据和消息入库,status默认为0生产者发送消息到MQ消费者进行消息确认正常消费情况下,update消息状态为1,表示正常消费,如果出现网络闪断等问题,则消息状态为0不变定时任务去数据库查询状态为0的消息针对此类数据,进行数据重发,重复2-3-4-5-6步骤统计重复次数3次以上的消息,不再进行重发,update消息状态为2,等待下一步处理

    消息的延迟投递,做二次确认,异步回调检查

 1. 生产者(上游服务)先将业务数据,比如订单数据入库,之后再发送消息到MQ中,而且不加事务,因为事务在这影响性能(绿色线)
 2. 定时在5分钟后,发送延迟检查消息(红色线)
 3. 消费者(下游服务)监听指定队列,消费消息(绿色线)
 4. 如果消费者成功接收消息后,将主动发送一条确认消息到MQ(蓝色线)
 5. 回调服务监听指定队列,该队列监听消费者发送的确认消息,如果成功收到确认消息,表示消息正常消费,则进行入库处理(黑色线)
 6. 回调服务监听指定队列,该队列监听生产者发送的延迟检查消息,5分钟后接收到消息,回调服务在数据库中检查这条消息是否被正常消费,如果一切正常,则不返回消息(红色线)
 7. 如果这条消息未正常消费,则回调服务调用RPC服务(rpc-远程服务调用),通知上游服务(生产者)进行重发消息(紫色线)
3.2-幂等性概念及业界主流解决方案

    引入

    借鉴数据库乐观锁机制:执行一条更新库存SQL

    //高并发情况下,同时两条数据更新语句,通过version来预防超卖
    update T_goods set count = count -1 ,version = version +1 where version = 1
    

    消费端实现幂等,就表示,即使消费端收到多条一样的消息,永远只会消费一次

    消费端幂等性保障

    场景:在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

      消费端实现幂等,就表示,即使消费端收到多条一样的消息,永远只会消费一次

    业务主流幂等性操作

    唯一ID + 指纹码 机制,利用数据库主键去重

    //指纹码,可以是时间戳,可以是区别码,id为主键唯一
    select count(1) from t_order where id = 唯一id+指纹码
    //查询这个主键是否有值,无则插入数据,有则说明消息已消费入库
    //好处:实现简单
    //缺点:高并发下有数据写入的性能瓶颈
    //解决方案:根据ID进行分库分表进行算法路由,进行分压分流
    

    利用Redis的原子性去实现

    使用redis进行幂等,
    利用redis set 之后,判断isexist()判断是否存在
    另外需要考虑问题
    1.我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存之间如何做到原子性?数据一致性
    2.如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
    
3.3-/confirm/i确认消息详解

    理解/confirm/i消息确认机制

    消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答

    生产者接收应答,用来确认这条消息是否正常发送到Broker,这种方式也是RabbitMQ消息的可靠性投递的核心保障

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OXi7Wl6e-1645578206551)(img/confirm/i消息确认.png)]

    实现/confirm/i确认消息

      在channel上开启确认模式,channel.confirmSelect()在channel上添加监听,add/confirm/iListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理

    代码演示

    // 生产者端
    public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost("118.126.65.50");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            channel./confirm/iSelect();
            String exchangeName = "test_/confirm/i_exchange";
            String routingKey   = "test_routing_key";
            String exchangeType = "direct";
            String queueName    = "test_/confirm/i_queue";
            String msg          = "confirm message";
    
            channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,exchangeName,routingKey);
    
            channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
            channel.add/confirm/iListener(new /confirm/iListener() {
                // broker (消息代理,就是MQ) 当消息投递到了所匹配的队列之后,broker就会发送一个确认信号,给到生产者
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println("======有ack=========");
                }
    
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    // 没有ack情况,磁盘占满、队列满等
                    System.out.println("======没有ack=========");
                }
            });
        }
    
    // 消费者端
    public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost("118.126.65.50");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            connectionFactory.setVirtualHost("/");
    
    
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
    
            String exchangeName = "test_/confirm/i_exchange";
            String routingKey   = "test_routing_key";
            String queueName    = "test_/confirm/i_queue";
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收消息-" +new String(body));
                }
            };
            channel.basicConsume(queueName,true,consumer);
        }
    
3.4-Return消息机制

    机制理解

    Return Listener用于处理一些不可路由的消息

    生产者通过指定一个Exchange和RoutingKey,把消息送到某个队列中,然后消费者监听队列,进行消费处理

    在某些情况下,如果生产者在发送消息时,当前的Exchange不存在或者指定的RoutingKey路由不到指定队列,这个时候要监听这种不可达消息,就要使用Return Listener

    基础API配置

    Mandatory:如果是true,则监听器会接收到路由不可达的消息,然后进行后续处理;如果是false,那么broker会自动删除该消息 (翻译:adj强制的,n代理人)

    参数设置 public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)

    channel.addReturnListener(new ReturnListener() {
            //replyCode-响应码
            //replyText-响应文本
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                System.out.println(new String(bytes));
            }
        });
    //如果消息正常发出,或者设置成false,则return listener不会监听,只有设置成true时才会监听错误返回
    channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,true,properties,msg.getBytes());
    
    
    replyCode:312
    replyText:NO_ROUTE
    exchange:exchange-01
    routingKey:routingKey-02
    body:hello RabbitMq return
    
3.5-消费端自定义监听
//4-创建消费者
    DefaultConsumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("msg-"+new String(body));
        }
    };
    //5-消费数据-队列名称,设置是否自动签收,消费者
    channel.basicConsume(QUEUE_NAME,true,consumer);
3.6-消费端的限流策略

    场景引入

    RabbitMQ服务器有1万条未处理的消息,此时打开消费端服务器,则会出现巨量消息瞬时全部推送,单个消费者端无法同时处理

    解决方案

    RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数量的消息未被确认前(通过基于consumer或者channel设置Qos的值),不进行消费新的数据

    消费端一定不要设置成自动签收,而是手动签收

    public void basicQos(int prefetchSize, int prefetchCount, boolean global)
    //prefetchSize:单条消息大小限制,一般设置0表示不限制
    //prefetchCount:一般是1,告诉RabbitMQ不要同时给一个消费者推送多余N个消息,如果N个消息返回了ack,则继续发送消息,如果没有则不要继续发送消息
    //global:一般false。true表示应用到channel级别,false表示应用到consumer级别,
    

    代码示例

    // 生产端不变
    // 消费者 要改变
    DefaultConsumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("msg-"+new String(body));
            //public void basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };
    channel.basicQos(0, 1, false);
    //限流方法,一定要将autoAck设置成false,表示手动签收
    channel.basicConsume(QUEUE_NAME,false,consumer);
    
3.7-消费端ACK与重回队列机制

    消费端手工ACK和NACK(返回ack表示成功消费,nack表示没有正常消费)

    消费端进行消费,如果超过count次数后,一直返回nack,则可能是由于业务逻辑异常导致,此时将消息添加到日志中,然后进行补偿发送如果由于服务器宕机等严重问题,那就需要手工进行ACK保障消费端消费成功

    消费端重回队列

    消费端重回队列,是为了没有处理成功的消息,把消息重新传递给Broker,将该条消息放入此队列尾端,重新发送

    一般实际使用中,都会关闭重回队列,设置成false

    //public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
    	//						消息标签,是否批量,是否重回队列
    channel.basicNack(envelope.getDeliveryTag(), false, false);
    

    代码演示

    // 生产端代码
    for (int i = 0; i < 5; i++) {
            String msg = "hello RabbitMq " + i;
            Map header = new HashMap<>();
            header.put("num",i);
            // 设置自定义参数
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(header)
                    .build();
        	// 发布消息
            channel.basicPublish("exchange-01","routingKey-01",true,properties,msg.getBytes(StandardCharsets.UTF_8));
       }
    
    // 消费端代码
    // 3.创建信道
    Channel channel = connection.createChannel();
    // 4.创建消费者
    DefaultConsumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 进行模拟判断,如果自定义属性的header头中,num参数为0,则返回重回队列中,不进行消费
            if((Integer) properties.getHeaders().get("num") == 0){
                // 消息唯一标签,是否批量处理,是否返回重回队列
                // 观察管控台和控制台,会发现一直打印第0个消息,因为一直在重回队列中不停的发送,一直没有消费,管控台上unacked一直有该条数据
                channel.basicNack(envelope.getDeliveryTag(),false,true);
            }else{
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
            System.out.println("消息-"+new String(body));
        }
    };
    // 5. 消费数据,将第二个参数autoACK设置为false,表示不进行自动签收,而是手动签收
    channel.basicConsume("queue-01",false,consumer);
    
3.8-TTL消息详解
    TTL(time to live)生存时间

    RabbitMQ支持消息的过期时间,在消息发送时可以进行指定RabbitMQ支持队列中消息过期时间,从消息进入队列开始计算,只要超过了队列的超时时间配置,则消息自动清除

3.9-死信队列DLX

    死信队列 (DLX dead-letter-exchange)

    利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLXDLX也是一个正常的Exchange,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置了某个队列的属性当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的DLX死信队列上,进而被路由到另一个队列可以监听这个队列中消息进行相应的处理,这个特性可以弥补RabbitMQ3.0之前支持的immediate参数功能

    消息变成死信的情况

    消息被拒绝(basic.reject/basic.nack)并且requeue = false(上面重回队列的设置参数,false表示不再重回队列)TTL过期队列达到最大长度

    死信队列设置

    首先需要设置死信队列的Exchange和Queue,然后进行绑定

    Exchange 比如 dlx.exchangeQueue 比如 dlx.queueRoutingKey #

    然后正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可

    arguments.put("x-dead-letter-exchange","dlx.exchange")

    代码演示

      消费端,创建正常的交换机、队列、并绑定,再创建死信交换机、死信队列、绑定,同时正常队列要设置参数绑定启动消费端,通过管控台,查看是否正常创建队列和交换机关闭消费端,模拟死信队列消息超时的情况生产端,发布消息,但设置消息过期时间为10s启动生产端,会发现5条消息一直堆积在commonQueue中,当消息过期后,5条消息自动转移到dlx.queue中,模拟成功,代码如下:
    // 消费端
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 2.创建连接
            Connection connection = connectionFactory.newConnection();
            // 3.创建信道
            Channel channel = connection.createChannel();
            // 4.创建交换机
            String commonExchangeName = "commonExchange";
            String commonExchangeType = "topic";
            String commonQueueName    = "commonQueue";
            String commonRoutingKey   = "common.#";
            // 交换机,交换机类型,是否持久化,是否自动删除,自定义参数
            channel.exchangeDeclare(commonExchangeName,commonExchangeType,true,false,null);
            // 5.创建队列
            // 队列,是否持久化,是否排他性,是否自动删除,自定义参数
            HashMap arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange","dlx.exchange");
            channel.queueDeclare(commonQueueName,true,false,false,arguments);
            // 6.交换机与队列绑定
            channel.queueBind(commonQueueName,commonExchangeName,commonRoutingKey);
    
            // 7.声明死信队列
            String dlxExchangeName = "dlx.exchange";
            String dlxExchangeType = "topic";
            String dlxQueueName    = "dlx.queue";
            String dlxRoutingKey   = "#";
            channel.exchangeDeclare(dlxExchangeName,dlxExchangeType,true,false,null);
            channel.queueDeclare(dlxQueueName,true,false,false,null);
            channel.queueBind(dlxQueueName,dlxExchangeName,dlxRoutingKey);
    
            // 4.创建消费者
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息-"+new String(body));
                }
            };
            // 5. 消费数据  队列名,是否自动签收,消费者对象
            channel.basicConsume(commonQueueName,false,consumer);
            // 8.关闭连接
            //channel.close();
            //connection.close();
        }
    }
    
    // 生产端
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 2.创建连接
            Connection connection = connectionFactory.newConnection();
            // 3.创建信道
            Channel channel = connection.createChannel();
            String commonExchangeName = "commonExchange";
            String commonRoutingKey   = "common.#";
            // 4.发送数据 (String exchange, String routingKey, BasicProperties props, byte[] body)
            for (int i = 0; i < 5; i++) {
                String msg = "hello RabbitMq  DLX" + i;
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF-8")
                        .expiration("10000")
                        .build();
                channel.basicPublish(commonExchangeName,commonRoutingKey,true,properties,msg.getBytes(StandardCharsets.UTF_8));
            }
            // 5.关闭连接
            //channel.close();
            //connection.close();
        }
    }
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745710.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号