栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【尚硅谷 RabbitMQ】3、图文详解 发布确认confirm 原理及策略(开启,单个、批量、异步)

【尚硅谷 RabbitMQ】3、图文详解 发布确认confirm 原理及策略(开启,单个、批量、异步)

1、发布确认原理

生产者将信道设置成 /confirm/i 模式,一旦信道进入/confirm/i模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外 broker也可以设置 basicAck 的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

/confirm/i模式最大的好处在于它是可以 异步 的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

2、发布确认策略 1)开启发布确认

发布确认默认是没有开启的,如果要开启,生产者需要调用通道channel的方法 /confirm/iSelect

Channel channel = connection.createChannel();
// 开启发布确认
channel./confirm/iSelect();
2)单个 发布确认

这是一种简单的确认方式,它是一种 同步 确认发布的方式,也就是发布一个消息之后,只有收到了确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内,这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有收到确认发布的消息,就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

package com.tuwer.rabbitmq./confirm/i;

import com.rabbitmq.client.Channel;
import com.tuwer.utils.RabbitMqUtils;

import java.io.IOException;


public class Producer {
    public static void main(String[] args) {
        // 单个确认发布
        one/confirm/i();
    }
    
    
    public static void one/confirm/i(){
        // 工具类
        RabbitMqUtils mqUtils = new RabbitMqUtils();
        // 获取通道
        Channel channel = mqUtils.getChannel("192.168.19.101", 5672, "admin", "admin", "/", "生产者(单个确认发布)");

        // 队列名称
        String queueName = "oneComfirmQueue";
        try {
            // 声明队列
            channel.queueDeclare(queueName, true, false, false, null);
            // 开启确认发布
            channel./confirm/iSelect();
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 消息
        String message = "";
        // 消息数量
        int messageCount = 1000;
        try {
            // 开始时间
            long begin = System.currentTimeMillis();
            // 循环发送消息
            for (int i = 0; i < messageCount; i++) {
                message = (i + 1) + "";
                channel.basicPublish("", queueName, null, message.getBytes());
                // 等待确认
                boolean flag = channel.waitFor/confirm/is();
                if (flag) {
                    System.out.println("消息" + message + " 已发送!");
                }
            }
            // 结束时间
            long end = System.currentTimeMillis();
            // 总耗时
            System.out.println("发布" + messageCount + "条信息总耗时" + (end - begin) + "ms");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭
            mqUtils.close();
        }
    }
}

3)批量 发布确认

单个发布确认方式非常慢,与单个等待确认消息相比,先发布一批消息,然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是 同步的,也一样阻塞消息的发布。

public class Producer {
    public static void main(String[] args) {
        // 单个确认发布
        //one/confirm/i();
        // 批量确认发布
        batch/confirm/i();
    }

    
    public static void batch/confirm/i() {
        // 工具类
        RabbitMqUtils mqUtils = new RabbitMqUtils();
        // 获取通道
        Channel channel = mqUtils.getChannel("192.168.19.101", 5672, "admin", "admin", "/", "生产者(批量确认发布)");

        // 队列名称
        String queueName = "batchComfirmQueue";
        try {
            // 声明队列
            channel.queueDeclare(queueName, true, false, false, null);
            // 开启确认发布
            channel./confirm/iSelect();
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 消息
        String message = "";
        // 消息数量
        int messageCount = 1000;
        // 批量确认消息数量
        int batchSize = 100;
        try {
            // 开始时间
            long begin = System.currentTimeMillis();
            // 循环发送消息
            for (int i = 0; i < messageCount; i++) {
                message = (i + 1) + "";
                channel.basicPublish("", queueName, null, message.getBytes());

                // 批量确认
                if (i % batchSize == 99) {
                    // 等待确认
                    boolean flag = channel.waitFor/confirm/is();
                    if (flag) {
                        System.out.println((i + 1) + "条消息已发送!");
                    }
                }
            }
            // 结束时间
            long end = System.currentTimeMillis();
            // 总耗时
            System.out.println("发布" + messageCount + "条信息总耗时" + (end - begin) + "ms");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭
            mqUtils.close();
        }
    }
}


4)异步 发布确认

异步确认虽然编程逻辑比上两个要复杂,但是 性价比最高,无论是可靠性还是效率都没得说,它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功

public class Producer {
    public static void main(String[] args) {
        // 单个确认发布
        //one/confirm/i();
        // 批量确认发布
        //batchConfirm();
        // 异步确认发布
        async/confirm/i();
    }

    
    public static void async/confirm/i() {
        // 工具类
        RabbitMqUtils mqUtils = new RabbitMqUtils();
        // 获取通道
        Channel channel = mqUtils.getChannel("192.168.19.101", 5672, "admin", "admin", "/", "生产者(异步确认发布)");

        // 队列名称
        String queueName = "asyncComfirmQueue";
        try {
            // 声明队列
            channel.queueDeclare(queueName, true, false, false, null);
            // 开启确认发布
            channel./confirm/iSelect();
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 消息
        String message = "";
        // 消息数量
        int messageCount = 1000;

        // ----- 添加确认监听器 -----
        // 确认回调
        /confirm/iCallback ackCallback = (deliveryTag, multiple) -> {
            System.out.println("消息" + deliveryTag + "已确认发布!" + multiple);
        };
        // 未确认回调
        /confirm/iCallback nackCallback = (deliveryTag, multiple) -> {
            System.out.println("消息" + deliveryTag + "未确认发布!");
        };
        // 确认监听器
        channel.add/confirm/iListener(ackCallback, nackCallback);

        try {
            // 开始时间
            long begin = System.currentTimeMillis();
            // 循环发送消息
            for (int i = 0; i < messageCount; i++) {
                message = (i + 1) + "";
                channel.basicPublish("", queueName, null, message.getBytes());
            }
            // 结束时间
            long end = System.currentTimeMillis();
            // 总耗时
            System.out.println("【异步确认】发布" + messageCount + "条信息总耗时" + (end - begin) + "ms");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭
            //mqUtils.close();
        }
    }
}

未确认信息处理

新建 未确认消息Map (并发Map),保存未确认的信息(因为消息的发送和确认是不同的线程,所以用并发Map)发送消息前,把该消息存入 未确认消息Map中确认回调时,从未确认消息Map中删除已确认的消息最后Map中剩下的就是未确认的消息

public static void async/confirm/i() {
    // 工具类
    RabbitMqUtils mqUtils = new RabbitMqUtils();
    // 获取通道
    Channel channel = mqUtils.getChannel("192.168.19.101", 5672, "admin", "admin", "/", "生产者(异步确认发布)");

    // 队列名称
    String queueName = "asyncComfirmQueue";
    try {
        // 声明队列
        channel.queueDeclare(queueName, true, false, false, null);
        // 开启确认发布
        channel./confirm/iSelect();
    } catch (IOException e) {
        e.printStackTrace();
    }

    // 消息
    String message = "";
    // 消息数量
    int messageCount = 1000;

    // 保存未确认信息:因为发布线程和确认线程是不同的,所以用并发工具类
    ConcurrentSkipListMap noConfirmMap = new ConcurrentSkipListMap<>();

    // ----- 添加确认监听器 -----
    // 确认回调
    /confirm/iCallback ackCallback = (deliveryTag, multiple) -> {
        if (multiple) {
            // 批量确认
            // 得到批量确认信息Map
            ConcurrentNavigableMap confirmMap = no/confirm/iMap.headMap(deliveryTag);
            // 清空已经确认的
            /confirm/iMap.clear();
        } else {
            // 非批量确认
            // 删除当前确认的信息
            no/confirm/iMap.remove(deliveryTag);
        }
        System.out.println("消息" + deliveryTag + "已确认发布!" + multiple);
    };
    // 未确认回调
    /confirm/iCallback nackCallback = (deliveryTag, multiple) -> {
        String m = no/confirm/iMap.get(deliveryTag);
        System.out.println("【未确认消息】" + deliveryTag + " : " + m);
    };
    // 确认监听器
    channel.add/confirm/iListener(ackCallback, nackCallback);

    try {
        // 开始时间
        long begin = System.currentTimeMillis();
        // 循环发送消息
        for (int i = 0; i < messageCount; i++) {
            message = (i + 1) + "";
            // 把下一条要发送的信息存入未确认Map中
            no/confirm/iMap.put(channel.getNextPublishSeqNo(), message);
            // 发送
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        // 结束时间
        long end = System.currentTimeMillis();
        // 总耗时
        System.out.println("【异步确认】发布" + messageCount + "条信息总耗时" + (end - begin) + "ms");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // 关闭
        //mqUtils.close();
    }
}

为什么不直接在未确认回调方法中添加呢?

直接在未确认回调方法 ConfirmCallback nackCallback 中,把未确认的信息添加到Map中,这样就不用把所有信息都加入,再一个个地删除确认的消息!这种思路是正确的,但方法行不通!因为Map中存的有key(消息编号)和value(消息内容),但是回调方法中只能得到消息编号deliveryTag,得不到消息内容。

如果回调方法中可以得到消息的内容就可以,以后再深入探究!未确认的消息不是永远都不确认,如果重发之后,确认了,还是要在确认回调方法中删除的。

5)速度对比
发布方式特点
单独发布同步等待确认,简单,但吞吐量非常有限
批量发布批量同步等待确认,简单,合理的吞吐量,一旦出现问题,很难推断出是那条消息出现了问题。
异步处理最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780350.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号