栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ学习二

RabbitMQ学习二

RabbitMQ学习二

RabbitMQ高级特性

消息可靠性投递Consumer ACK消费端限流TTL死信队列延迟队列日志与监控消息可靠性分析与追踪 RabbitMQ应用问题

消息可靠性保障消息幂等性保障

RabbitMQ高级特性 消息可靠性投递

首先搭建如下代码块:
整体目录如下:

文件依次如下:
pom.xml:



    4.0.0

    com.lsd
    rabbitmq-producer-spring
    1.0-SNAPSHOT

    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        

        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        

        
            junit
            junit
            4.12
        

        
            org.springframework
            spring-test
            5.1.7.RELEASE
        
    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.0
                
                    1.8
                    1.8
                
            
        
    

rabbitmq.properties:

rabbitmq.host=192.168.75.128
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=/

spring-rabbitmq-producer.xml文件:



    
    

    
    
    
    

    
    

测试类:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    }

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式。
1、confirm 确认模式
消息从producer到exchange则会返回一个/confirm/iCallback

    
    

首先在spring-rabbitmq-producer.xml里面将 publisher-confirms="true"加上,
然后通过如下代码测试/confirm/i模式:

   
    @Test
    public void test/confirm/i() {

        //2.定义回调
        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("接收成功消息"+cause);
                } else {
                    System.out.println("接收失败消息"+cause);
                    //做一些处理让消息再次发送
                }
                System.out.println("/confirm/i方法执行了");
            }
        });
        rabbitTemplate.convertAndSend("test_exchange_/confirm/i","/confirm/i","message /confirm/i...");
    }


成功返回如上消息。

2、return 退回模式
消息从exchange到queue投递失败则返回一个returnCallback
第一步首先开启Return模式:

    

测试代码如下:

    
    @Test
    public void testReturn() {
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        //2、设置ReturnCallBack
        rabbitTemplate.setReturnCallback(
                //message 消息对象 replyCode错误码  replyText 错误信息 exchange 交换机 routingKey路由键
                (Message message, int replyCode, String replyText, String exchange, String routingKey) -> {
                    System.out.println(message);
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
                    System.out.println("return 执行了。。。");
                });
        rabbitTemplate.convertAndSend("test_exchange_/confirm/i", "/confirm/i11", "message /confirm/i...");
    }


我们将利用这两个callback控制消息的可靠性投递

Consumer ACK

ACK指的是Acknowledge,确认。表示消费端受到消息后的确认方式。
有三种确认方式:
1、自动确认 acknowledge=“none”
2、手动确认 acknowledge=“manual”,等待业务处理无问题后,手动调用代码处理
3、根据异常情况来确认 acknowledge = “auto”
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1、自动确认
配置消费端项目
整体项目结构如下所示:

pom文件配置



    4.0.0

    com.lsd
    rabbitmq-consumer-spring
    1.0-SNAPSHOT

    
        11
        11
    
    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        

        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        

        
            junit
            junit
            4.12
        

        
            org.springframework
            spring-test
            5.1.7.RELEASE
        
    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.0
                
                    1.8
                    1.8
                
            
        
    

rabbitmq.properties配置

rabbitmq.host=192.168.75.128
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.virtual-host=/

spring-rabbitmq-consumer.xml配置:



    
    

    
    


    

    
    
        

    



消费端代码如下:

@Component
public class AckListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println(new String(message.getBody()));
    }
}

测试代码如下

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test() {
        while (true) {

        }
    }


}

然后通过生产者生产一条消息,得到如下结果

设置手动签收

@Component
public class AckListener implements ChannelAwareMessageListener {
    
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        TimeUnit.SECONDS.sleep(1);
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑。。。");
            //deliveryTag 收到消息的标签 multiple 允许签收所有的消息
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            e.printStackTrace();
            //第三个参数:requeue:重回消息队列。如果为true,则消息重新回到队列
            channel.basicNack(deliveryTag,true,true);
        }
    }
}
消费端限流

通过消费端限流可以保证系统的稳定性。

限流代码如下:
在spring-rabbitmq-consumer.xml中增加如下配置

 
    

        
        
    
@Component
public class QosListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        TimeUnit.SECONDS.sleep(5);
        //获取消息
        System.out.println(new String(message.getBody()));
        //处理业务逻辑

        //签收
     channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}

再在生产者端创建多条消息,可以看到这边消费消息是每5s消费一次,注意消费端的确认模式一定为手动确认acknowledge=manual。

TTL

TTL 全称为Time To Live(存活时间/过期时间)
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间。
代码如下:

  
        
            
            
        
    

    
        
            
        
    
   
    @Test
    public void testTtl() {
        
   
        
        //消息单独过期,消息的后置处理对象,设置一些消息的参数信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                //返回该消息
                return message;
            }
        };
       // rabbitTemplate.convertAndSend("test_exchange_Ttl", "ttl.hehe", "message ttl ...", messagePostProcessor);

        
        for (int i = 0; i < 10; i++) {
            if(i==5) {
                //消息单独过期
                rabbitTemplate.convertAndSend("test_exchange_Ttl", "ttl.hehe", "message ttl ...", messagePostProcessor);
            }
            else{

                rabbitTemplate.convertAndSend("test_exchange_Ttl", "ttl.hehe", "message ttl ...");

            }
        }
    }

注意点:
如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准
队列过期后会将队列所有消息全部移除
消息过期后,只有消息在队列顶端,才会去判断其是否过期(过期移除掉)

死信队列

死信队列,英文缩写:DLX。Dead Letter Exhcange(死信交换机),当消息成为Dead Message后,可以被发送到另一个交换机,这个交换机就是DLX

消息成为死信的三种情况:
1、队列长度达到限制
2、消费者拒接消息,basciNack/basciReject,并且不把消息重新放入原目标队列,requeue=false
3、原队列存在消息过期设置消息到达超时时间未被消费

队列绑定死信交换机的方式:
给队列设置参数:x-dead-letter-exchange 和x-dead-letter-routing-key
在spring-rabbitmq-producer中增加如下配置

 
    
    
        
            
            
            
            
            
            
        
    
    
        
            
        
    
    
    
    
        
            
        
    

如上设置完成之后来分别验证进入死信队列的三种情况:

   
    @Test
    public void testDlx() {
        //1、测试过期时间,死信消息
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息。。。");

        //2.测试长度限制后,消息死信,十条消息进入正常队列,剩余的直接进入死信队列,过了10s后20条消息全进入死信队列
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息。。。");
        }
        //3、测试消息拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条被拒收消息。。。");
    }

第三种需要我们的消费端代码如下(注意:第三种消息拒收应该将requeue这个参数设置为false,使之不进入发送端的exchange中而是进入信息队列的exchange中)

@Component
public class DlxListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑。。。");
            int i = 3 / 0;
            //deliveryTag 收到消息的标签 multiple 允许签收所有的消息
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("拒绝接收出现异常");
            //拒绝接收消息
            channel.basicNack(deliveryTag, true, false);
        }
    }
}
延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
1、下单后,30分钟未支付,取消订单,库存回滚。
2、新用户注册成功7天后,发送短信问候。
实现方式:


但是RabbitMQ中并未提供延迟队列功能。
但是可以用:TTL+死信队列组合实现延迟队列的效果。

延迟队列订单模拟代码实现:
在生产端的spring-rabbitmq-producer.xml里面增加如下配置:

  
    
    
        
            
            
            
        
    
    
        
            
        
    

    
    
        
            
        
    

测试代码如下
生产端:

   @Test
    public void testDelay() throws InterruptedException {

        //1、发送订单消息,将来是在订单系统中,下单成功后,发送的消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息。。。。");
        //2、打印倒计时10s
        for (int i = 0; i <10 ; i++) {
            System.out.println(i+"....");
            TimeUnit.SECONDS.sleep(1);
        }
    }

消费端:
spring-rabbitmq-consumer.xml

  
    
        
    
@Component
public class OrderListener  implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑。。。");
            System.out.println("根据订单id查询订单状态。。。");
            System.out.println("判断状态是否为支付成功。。。");
            System.out.println("取消订单,回滚库存。。。");
            //手动签收
            //deliveryTag 收到消息的标签 multiple 允许签收所有的消息
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("拒绝接收出现异常");
            //拒绝接收消息
            channel.basicNack(deliveryTag, true, false);
        }
    }
}

如上listener会延迟10s才能接收到正常消息队列发送到死信队列的消息。

日志与监控

查看队列
rabbitmqctl list_queues
查看环境变量
rabbitmqctl environment
查看exchanges
rabbitmqctl list_exchanges
查看未被确认的队列
rabbitmqctl list_queues name messages_unacknowledged
查看用户
rabbitmqctl list_users
查看单个队列的内存使用
rabbitmqctl list_queues name memory
查看连接
rabbitmqctl list_connections
查看准备就绪的队列
rabbitmqctl list_queues name message_ready
查看消费者信息
rabbitmqctl list_consumers

消息可靠性分析与追踪

消息追踪:
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。
RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪
消息追踪:
firehose机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exhcange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为publish.exchangename和deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息,和消费者从queue上获取的消息。
注意:打开trace会影响消息写入功能,适当打开后请关闭
rabbitmqctl trace_on 打开firehose命令
rabbitmqctl trace_off 关闭firehose命令

消息追踪-rabbitmq_tracing
rabbitmq_tracing和firehose实现如出一辙,只不过rabbitmq_tracing多了一层GUI包装,更容易使用和管理。
插件列表
rabbitmq-plugins list
启用插件:rabbitmq-plugins enable rabbitmq_tracing

RabbitMQ应用问题 消息可靠性保障

消息补偿机制
需求:100%保障消息发送成功

流程如下:
正常情况下:
首先producer将数据保存到自己的数据库中,
producer发送消息到Consumer消费
Consumer将该消息存入DB
consumer消费完成发送确认消息到Q2
回调检查服务监听到Q2的确认消息并存入MDB
过了一段时间后producer发送一个延迟消息到Q3,回调服务监听到Q3的延迟消息过来,并对数据库里面的消息做校验,看是否有同样的消息id,如果没有的话则调用Producer重新发送消息。
如果延迟消息也发送失败了,则根据定时任务比对MDB和Conusumer以及Producer的DB找出消息重新发送。

消息幂等性保障

幂等性是指一次和多次请求某一资源的时候,对于资源本身具有相同的结果。也就是说,其任意多次执行对资源本身锁产生的影响均与一次执行的影响相同。
在MQ指,消费多条相同的消息,得到与消费该消息一次相同的结果。

乐观锁解决

第一次执行version=1
update account set money = money-500,version = version+1
where id = 1 and version = 1;
第二次执行:version = 2
update account set money = money-500,version = version+1
where id = 1 and version = 1; 此条sql不会被执行,就不会消费同样的消息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/748457.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号