栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

MQ发布确认的三种策略,springboot多数据源原理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

MQ发布确认的三种策略,springboot多数据源原理

channel.queueDeclare(queueName, false, false, false, null);

//开启发布确认

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

channel.confirmSelect();

long begin = System.currentTimeMillis();

for (int i = 0; i < MESSAGE_COUNT; i++) {

String message = i + “”;

channel.basicPublish("", queueName, null, message.getBytes());

//服务端返回 false 或超时时间内未返回,生产者可以消息重发

boolean flag = channel.waitForConfirms();

if(flag){

System.out.println(“消息发送成功”);

}

}

long end = System.currentTimeMillis();

System.out.println(“发布” + MESSAGE_COUNT + “个单独确认消息,耗时” + (end - begin) +

“ms”);

}

}

执行结果

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。

当然,现在跟你说慢,你莫得感知,下面几种综合起来对比你就会发现他的效率有多低了!

二、批量确认发布

========

与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地 提高吞吐量。

生产者

public static void publishMessageBatch() throws Exception {

Channel channel = RabbitMqUtils.getChannel();

//队列名使用uuid来获取不重复的值,不需要自己再进行命名了。

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName, false, false, false, null);

//开启发布确认

channel.confirmSelect();

//批量确认消息大小

int batchSize = 88;

//未确认消息个数

int outstandingMessageCount = 0;

long begin = System.currentTimeMillis();

for (int i = 0; i < MESSAGE_COUNT; i++) {

String message = i + “”;

channel.basicPublish("", queueName, null, message.getBytes());

outstandingMessageCount++;

if (outstandingMessageCount == batchSize) {

channel.waitForConfirms();//确认代码

outstandingMessageCount = 0;

}

}

//为了确保还有剩余没有确认消息 再次确认

if (outstandingMessageCount > 0) {

channel.waitForConfirms();

}

long end = System.currentTimeMillis();

System.out.println(“发布” + MESSAGE_COUNT + “个批量确认消息,耗时” + (end - begin) +

“ms”);

}

执行结果

**缺点:**当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。

当然这种方案仍然是同步的,也一样阻塞消息的发布。

三、异步确认发布

========

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。

生产者

public static void publishMessageAsync() throws Exception {

try (Channel channel = RabbitMqUtils.getChannel()) {

String queueName = UUID.randomUUID().toString();

channel.queueDeclare(queueName, false, false, false, null);

//开启发布确认

channel.confirmSelect();

ConcurrentSkipListMap outstandingConfirms = new

ConcurrentSkipListMap<>();

ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {

if (multiple) {

//返回的是小于等于当前序列号的未确认消息 是一个 map

ConcurrentNavigableMap confirmed =

outstanding/confirm/is.headMap(sequenceNumber, true);

//清除该部分未确认消息

/confirm/ied.clear();

}else{

//只清除当前序列号的消息

outstanding/confirm/is.remove(sequenceNumber);

}

};

ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {

String message = outstanding/confirm/is.get(sequenceNumber);

System.out.println(“发布的消息”+message+“未被确认,序列号”+sequenceNumber);

};

channel.addConfirmListener(ackCallback, null);

long begin = System.currentTimeMillis();

for (int i = 0; i < MESSAGE_COUNT; i++) {

String message = “消息” + i;

outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);

channel.basicPublish("", queueName, null, message.getBytes());

}

long end = System.currentTimeMillis();

System.out.println(“发布” + MESSAGE_COUNT + “个异步确认消息,耗时” + (end - begin) +

“ms”);

}

}

执行结果

很容易看出,这种方式速度快得飞起呀!

如何处理未确认的消息?

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentlinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673220.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号