栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ异步发布确认

RabbitMQ异步发布确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

 

public class ComfirmMessage {

    // 批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //3、异步批量确认
        // 发布1000个异步确认消息,耗时36ms
        ComfirmMessage.publicMessageAsync();

    }

    public static void publicMessageAsync() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);

        // 开启发布确认
        channel.confirmSelect();
        // 开始时间
        long begin = System.currentTimeMillis();

        // 消息确认成功回调函数
        ConfirmCallback ackCallback = (deliveryTag,multiply) -> {
            System.out.println("确认的消息:"+deliveryTag);
        };

        // 消息确认失败回调函数
        
        ConfirmCallback nackCallback = (deliveryTag,multiply) -> {
            System.out.println("未确认的消息:"+deliveryTag);
        };

        // 准备消息的监听器,监听哪些消息成功,哪些消息失败
        
        channel.addConfirmListener(ackCallback,nackCallback);

        // 批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));
        }

        // 结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时"+ (end - begin) + "ms");
    }
}

如何处理异步未确认信息?

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentlinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递

public class ComfirmMessage {

    // 批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //3、异步批量确认
        // 发布1000个异步确认消息,耗时36ms
        ComfirmMessage.publicMessageAsync();

    }

    public static void publicMessageAsync() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);

        // 开启发布确认
        channel.confirmSelect();

        
        ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap<>();

        // 消息确认成功回调函数
        ConfirmCallback ackCallback = (deliveryTag,multiply) -> {
            // 删除到已经确认的消息,剩下的就是未确认的消息
            if(multiply){
                ConcurrentNavigableMap confiremed = outstanding/confirm/is.headMap(deliveryTag);
                confiremed.clear();
            }else {
                outstanding/confirm/is.remove(deliveryTag);
            }

            System.out.println("确认的消息:"+deliveryTag);
        };

        // 消息确认失败回调函数
        
        ConfirmCallback nackCallback = (deliveryTag,multiply) -> {
            // 打印一下未确认的消息都有哪些
            String message = outstanding/confirm/is.get(deliveryTag);
            System.out.println("未确认的消息是:" + message +"未确认的消息tag:" + deliveryTag);
        };

        // 准备消息的监听器,监听哪些消息成功,哪些消息失败
        
        channel.addConfirmListener(ackCallback,nackCallback);

        // 开始时间
        long begin = System.currentTimeMillis();

        // 批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));

            // 此处记录下所有要发送的消息的总和
            outstanding/confirm/is.put(channel.getNextPublishSeqNo(),message);
        }



        // 结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时"+ (end - begin) + "ms");
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/317058.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号