栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Rabbitmq

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Rabbitmq

Rabbitmq的高级特性

1.确认机制(producer—>rabbitmq broker)
从生产者发布消息到broker的这个过程,broker接收到消息的确认。这里分两种状态

  • /confirm/i:生产者把消息发送到broker时,可能出现两种状态,1.ack(表示broker接收成功) 2.nack(表示broker拒收消息)
  • return:表示broker接收消息成功,但是没有队列去存放消息,最终会把消息退回给生产者。
channel./confirm/iSelect();
        channel.add/confirm/iListener(new /confirm/iListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                //第二个参数代表接收的数据是否为批量接收,一般我们用不到。
                System.out.println("消息已被Broker接收,Tag:" + l );
            }

            @Override
            public void handleNack(long l, boolean b) throws IOException {
                System.out.println("消息已被Broker拒收,Tag:" + l);
            }
        });

2.消息可靠性投递
(producer—>rabbitmq broker—>exchange—>queue)
下面这两种情况,前提是需要在配置文件中,开始配置


  • 消息从producer到exchange会返回一个/confirm/iCallback
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean b, String s) {
                //ack 为  true表示 消息已经到达交换机
                if (b) {
                    //接收成功
                    System.out.println("接收成功消息" + s);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + s);
                    //做一些处理,让消息再次发送。
                }
            }
        });
        rabbitTemplate.convertAndSend("test_exchange_/confirm/i", "/confirm/i", "message /confirm/i...");

消息从exchange—>queue,传递失败会返回一个ReturnCallback

//这个设置为true,表示消息传递失败会退还给producer
rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("return 执行了....");

                System.out.println("message:" + message);
                System.out.println("replyCode:" + i);
                System.out.println("replyText:" + s);
                System.out.println("exchange:" + s1);
                System.out.println("routingKey:" + s2);
            }
        });
        rabbitTemplate.convertAndSend("test_exchange_/confirm/i", "/confirm/i", "message /confirm/i...");

3.Consumer Ack(消费者端的确认)
确认方式有三种:
首先需要开启确认方式的配置:


  • 自动确认:acknowledge=“none”
    自动确认:消息一旦被确认,就会从消息队列中移除
  • 根据异常确认:acknowledge=“auto”
  • 手动确认:acknowledge=“manual”
    手动确认:消费端接收到消息之后,进行相应的处理,处理成功调用channel.basicAck(),手动确认,处理失败调用channel.basicNack(),把这个消息返回给producer,可以让其重新发送消息。
@Component
public class AckListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("message = " + new String(message.getBody()));
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
        	//假设业务处理异常
            int i = 5/0;
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
            channel.basicNack(deliveryTag, true, true);
        }
        
    }
}

4.消费端限流
prefetch属性设置消费端一次拉取多少消息


5.TTL(Time To Live)
存活时间:当消息过了这个时间,还没被消费,就会被清除。
也可以对队列设置过期的时间。

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

6.死信队列
死信队列&死信交换器:DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列。
在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。
消息成为死信的条件:
6.1消息数量超出队列的长度
6.2消息到了过期的时间
6.3消费端拒绝消费的消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

7.延时队列
在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
需求:用户在系统中创建一个订单,如果30分钟后,用户没有进行支付,那么自动取消订单。

声明两个交换机:都是普通交换机只是一个名字叫做普通交换机(设置过期时间),一个叫死信交换机
声明两个队列:都是普通队列只是一个名字叫做普通队列(绑定死信交换机),一个叫死信队列



        
        
            
            

            
            

            
            
            
            
        
    


        
            
        
    

 

    
    
        
            
        
    

消费者端定义一个监听者

@Component
public class OrderListener implements ChannelAwareMessageListener{}

8.幂等性
1.使用乐观锁
2.是由全局唯一的id

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/592360.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号