栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ中的单个确认、批量确认、异步确认模式

RabbitMQ中的单个确认、批量确认、异步确认模式

准备代码 生产Channel的工具类

    public static Channel getChannel() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return channel;
    }
(一)、单个消息确认 重点代码
//消息确认
boolean flag = channel.waitForConfirms();
Channel channel = RabbitUtils.getChannel();
channel.queueDeclare(PUBLISH_QUEUE, true, false, false, null);
//开启发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_NUM; i++) {
    String message = i+"";
    channel.basicPublish("", PUBLISH_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    //单个消息马上进行确认
    boolean flag = channel.waitForConfirms();
    if (flag) {
       System.out.println("消息被确认 = " + message);
    }
}
long end = System.currentTimeMillis();
System.out.println("end-begin = " + (end - begin));
(二)、批量确认 重点代码
boolean flag = channel.waitForConfirms();
public static void publishMessageBatch() throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.queueDeclare(PUBLISH_QUEUE, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_NUM; i++) {
            String message = i+"";
            channel.basicPublish("", PUBLISH_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

            if (i % 100 == 0) {
                boolean flag = channel.waitForConfirms();
                System.out.println("消息被确认 = " + message);
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("批量确认时间 :end-begin = " + (end - begin));
}
(三)、异步确认 重点代码:
//监听确认回调和未确认回调
channel.addConfirmListener(/confirm/iCallback, nackCallback);
 完整代码:
public static void publishMessageAsync() throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.queueDeclare(PUBLISH_QUEUE, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();

        //消息确认成功回调
        ConfirmCallback confirmCallback = (deliveryTag, multiple) ->{
            System.out.println("已确认的消息" + deliveryTag);
        };
        //消息确认失败回调
        //deliveryTag :  消息的标记
        //multiple : 是否批量确认
        ConfirmCallback nackCallback = (deliveryTag, multiple)->{
            System.out.println("未确认的消息" + deliveryTag);
        };
        //开启消息监听,监听哪些消息成功,哪些消息失败
        channel.addConfirmListener(/confirm/iCallback, nackCallback);

        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_NUM; i++) {
            String message = i+"";
            channel.basicPublish("", PUBLISH_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
        }
        long end = System.currentTimeMillis();
        System.out.println("异步确认时间 :end-begin = " + (end - begin));
}
异步确认优化: 记录所有消息的标记和内容,然后判断哪些消息被接收到哪些没有

 重点代码

1、创建一个ConcurrentSkipListMap存放消息的序号和消息

ConcurrentSkipListMap outstandingConfirms= new ConcurrentSkipListMap<>();

 

public static void publishMessageAsync() throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.queueDeclare(PUBLISH_QUEUE, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        
        ConcurrentSkipListMap outstandingConfirms= new ConcurrentSkipListMap<>();

        //消息确认成功回调
        ConfirmCallback confirmCallback = (deliveryTag, multiple) ->{
            if (multiple) {
                ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(deliveryTag);
                /confirm/ied.clear();
            } else {
                outstanding/confirm/is.remove(deliveryTag);
            }
            System.out.println("已确认的消息" + deliveryTag);
        };
        //消息确认失败回调
        //deliveryTag :  消息的标记
        //multiple : 是否批量确认
        ConfirmCallback nackCallback = (deliveryTag, multiple)->{
            System.out.println("未确认的消息" + deliveryTag);
            System.out.println("outstandingConfirms = " + outstanding/confirm/is);
        };
        //开启消息监听,监听哪些消息成功,哪些消息失败
        channel.addConfirmListener(/confirm/iCallback, nackCallback);

        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_NUM; i++) {
            String message = i+"";
            channel.basicPublish("", PUBLISH_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            //outstanding/confirm/is中存放序号和消息
            outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
        }
        long end = System.currentTimeMillis();
        System.out.println("异步确认时间 :end-begin = " + (end - begin));
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676524.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号