持久化
队列持久化消息持久化 发布确认
单个发布确认批量发布确认异步发布确认
三种确认方式代码及确认时长对比
持久化持久化和发布确认都是为了保证消息传输的可靠性以及将数据写在磁盘上。
队列持久化只需要在生产者生成队列的时候把参数变成true即可。
//生成队列 Boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
需要注意的是如果已经生成队列了,直接改参数可能会报错,需要先删除队列在重新生成才会有持久化
在生产者发布消息的时候修改第三个参数即可
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
发布确认
在创建完信道之后就可以开启
//开启发布确认 channel.confirmSelect();单个发布确认
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,缺点就是发布速度特别慢
批量发布确认单个发布确认方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
异步发布确认利用回调函数来达到消息可靠性传递的,最佳性能和资源使用,在出现错误的情况下可以很好地控制,虽然逻辑复杂但是性价比最高
public class /confirm/iMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
//1.单个确认 610ms
///confirm/iMessage.publishMessageIndividually();
//2.批量确认 130ms
///confirm/iMessage.publishMessageBatch();
//3.异步批量确认 96ms
/confirm/iMessage.publishMessageAsync();
}
//单个确认
public static void publishMessageIndividually() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel./confirm/iSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitFor/confirm/is();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) +
"ms");
}
}
//批量确认
public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel./confirm/iSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitFor/confirm/is();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitFor/confirm/is();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) +
"ms");
}
}
//异步批量确认
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel./confirm/iSelect();
ConcurrentSkipListMap outstandingConfirms = new
ConcurrentSkipListMap<>();
/confirm/iCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap confirmed =
outstanding/confirm/is.headMap(sequenceNumber, true);
//清除该部分未确认消息
/confirm/ied.clear();
}else{
//只清除当前序列号的消息
outstanding/confirm/is.remove(sequenceNumber);
}
};
/confirm/iCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstanding/confirm/is.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
channel.add/confirm/iListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
"ms");
}
}
异步批量确认效率最高



