栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ的相关操作4--发布确认

RabbitMQ的相关操作4--发布确认

目录
  • 1.预备操作
  • 2. 设置批量发消息的个数
  • 3. 编写单个确认代码
  • 4. 编写批量确认代码
  • 5. 编写异步发布确认代码
  • 6. 运行测试代码

1.预备操作

与 RabbitMQ的相关操作2–轮训分发消息中前两步一致,引入相关依赖,并编写创建信道的工具类代码

2. 设置批量发消息的个数
	//批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;
3. 编写单个确认代码
//单个确认
    public static void publicMessageIndividually() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个消息马上进行发布确认
            boolean flag = channel.waitFor/confirm/is();
            if (flag){
                System.out.println("消息发送成功:" + message);
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("单个确认耗费时间:"+(end-begin) + "ms");
    }
4. 编写批量确认代码
    //批量确认
    public static void publicMessageBatch() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量确认消息大小
        int batchSize = 100;
        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //批量确认,100条确认一次
            if ((i+1) % batchSize == 0 && channel.waitFor/confirm/is()){
                System.out.println("消息发送成功");
            }
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("批量确认耗费时间:"+(end-begin) + "ms");
    }
5. 编写异步发布确认代码
    //异步发布确认
    public static void publicMessageAsync() throws IOException, TimeoutException{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        //开启发布确认
        channel./confirm/iSelect();

        
        ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();

        
        /confirm/iCallback ackCallback = (deliveryTag,mutiple) -> {
            if (mutiple){//如果是累计确认,就累计删除。headMap,返回此映射的部分视图,其键值严格小于k.
                //2:删除已经确认的消息,剩下的就算未确认的消息
                ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(deliveryTag);
                /confirm/ied.clear();
            }else {//累计确认可能会有消息丢失,一般用这个单个确认
                outstanding/confirm/is.remove(deliveryTag);
            }
            System.out.println("确认的消息:"+deliveryTag);
        };
        
        /confirm/iCallback nackCallback = (deliveryTag,mutiple) -> {
            //3:编写未确认的消息
            String message = outstanding/confirm/is.get(deliveryTag);
            System.out.println("未确认的消息是:"+message + "--未确认的消息tag:" +deliveryTag);
        };

        //准备消息的监听器,监听哪些消息成功或失败
        channel.add/confirm/iListener(ackCallback,nackCallback);   //异步通知

        //开始时间
        long begin = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = ""+i;
            channel.basicPublish("",queueName,null,message.getBytes());

            //1:此处记录下所有要发送的消息(序号,信息)
            outstanding/confirm/is.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("异步确认耗费时间:"+(end-begin) + "ms");
    }
6. 运行测试代码
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        //1.单个确认
//        publicMessageIndividually();    //单个确认耗费时间:10864ms
        //2.批量确认
//        publicMessageBatch();   //批量确认耗费时间:168ms
        //3.异步批量确认
        publicMessageAsync();   //异步确认耗费时间:25ms
    }

可以发现,异步确认消耗的时间远小于其他两种

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612581.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号