栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

2021-10-08

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

2021-10-08

RabbitMQ的发布确认 发布确认的概念:

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的
消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker
就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队
列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传
给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置
basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信
道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调
方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消
息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认分为:

  • 单个发布确认
  • 批量发布确认
  • 异步发布确认
三种发布确认代码如下
package com.sjf.rabbitmq.four;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iCallback;
import com.sjf.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;


public class /confirm/iMessage {

    //批量发消息的个数
    public static final int message_count = 1000;
    public static void main(String[] args) throws Exception{
        //1.调取单个确认
       // /confirm/iMessage.publishMessageIndividually(); //发布1000个单独确认消息,耗时:589ms
        //2.调用批量确认
     ///confirm/iMessage.publishMessageBatch();//发布1000个批量确认消息,耗时:89ms
        //3.调用异步确认
        /confirm/iMessage.publishMessageAsync();//发布1000个异步确认消息,耗时:42ms
    }

    //单个确认
    public static void publishMessageIndividually() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();
        //开始时间是多少
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < message_count; i++) {
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个消息就马上发布确认
            boolean flag = channel.waitFor/confirm/is();
            if(flag){
                System.out.println("消息发送成功");
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+message_count+"个单独确认消息,耗时:"+(end-begin)+"ms");
    }

    //批量发布确认
    public static  void publishMessageBatch() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();
        //开始时间是多少
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;

        //批量发消息
        for (int i = 0; i < message_count; i++) {
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            if(i%batchSize == 0) {
                //每100条消息进行发布确认
                channel.waitFor/confirm/is();
            }
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+message_count+"个批量确认消息,耗时:"+(end-begin)+"ms");
    }

    //异步发布确认
    public static void publishMessageAsync()throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //队列声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();

        
        ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();




        //消息确认成功 回调函数
        //参数var1为消息的标记,var3为是否为批量确认
        /confirm/iCallback confirmCallback = (var1, var3)->{
            if(var3){
                //删除已经确认的消息
                ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(var1);
                /confirm/ied.clear();
            }else{
                outstanding/confirm/is.remove(var1);
            }
            System.out.println("确认的消息:"+var1);
        };
        //消息确认失败 回调函数
        /confirm/iCallback confirmNoCallback = (var1, var3)->{
            String message = outstanding/confirm/is.get(var1);
            System.out.println("未成功确认的消息的tag:"+var1+":"+message);
        };
        //准备消息的监听器,异步监听消息的发送确认
        channel.add/confirm/iListener(/confirm/iCallback,/confirm/iNoCallback); //异步通知

        //开始时间是多少
        long begin = System.currentTimeMillis();
        //批量发送消息
        for (int i = 0; i < message_count; i++) {
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //此处记录下所有要发送的消息,消息的总和
            outstanding/confirm/is.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+message_count+"个异步确认消息,耗时:"+(end-begin)+"ms");
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/306629.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号