这篇文章来介绍Word Queues部分的使用
本用使用的RabbitMqUtils工具类链接:https://editor.csdn.net/md/?articleId=124615192
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务
一. 轮训分发消息1.1、启动两个工作线程轮训举个例子,有三个人在打斗地主,发牌小妹发牌,a一个,b一个,c一个.这里我们把牌看做为消息,消费者轮着进行消费
消费者代码
public class Worker01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println(" 接收到消息:"+receivedMessage);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println(consumerTag+" 消费者取消消费接口回调逻辑");
};
System.out.println("C1 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
开启运行多实例
1.2、开启生产者线程生产者代码,编写并开启
public class Task01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try(Channel channel=RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" 发送消息完成:"+message);
}
}
}
}
生产者线程开启后开启两个或以上消费者线程
从控制台输入消息,观察两个消费者线程的工作,完成
二、消息应答 2.1、概念先抛出一个问题,一个消费者在接受了一个消息后,在处理这个消息中挂掉了,而任务还没有完成.这就造成了消息的丢失.
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费 者在接
收到消息并且处理该消息之后,告诉 q rabbitmq 它已经处理了,q rabbitmq 可以把该消息删除了。
应答,可以理解为我完成了这个消息,告诉MQ你可以删除了
2.2、消息应答的方法A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了
multiple 的 true 和 false 代表不同意思
true 代表批量应答 channel 上未应答的消息
比如说 channel 上有给tag 的5消息, 当前 tag 是其中一个 那么此时其他这些还未应答的消息都会被确认收到消息应答
false 同上面相比
只会应答当前进行的消息
前面也说了,使用手动应答的目的就是为了解决某一服务器挂掉而导致的消息丢失问题,而使用手动应答后,没有被应答的消息则会进行重新排队,其他未挂掉的消费者会将其完成
2.5、消息手动应答 代码SleepUtils是一个自定义的工具类,默认的Thead.sleep()默认是以毫秒为单位,而SleepUtils则是封装了一个静态方法,将参数值*1000放入Thead.sleep()中.这样则实现了以秒为单位,代码简单这里就不书写了
消费者1
package com.win.rabbitmq.three;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.win.rabbitmq.utils.RabbitUtils;
import com.win.rabbitmq.utils.SleepUtils;
public class Work01 {
public static final String QUEUE_NAME="ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接受到的信息"+new String(message.getBody()) );
SleepUtils.sleep(1);
//手动进行应答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback= consumerTag -> System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
System.out.println("C1等待接受信息");
//将第二次参数autoAck改为false,表示不自动应答
channel.basicConsume(QUEUE_NAME, false,deliverCallback,cancelCallback );
}
}
消费者2
package com.win.rabbitmq.three;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.win.rabbitmq.utils.RabbitUtils;
import com.win.rabbitmq.utils.SleepUtils;
public class Work02 {
public static final String QUEUE_NAME="ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接受到的信息"+new String(message.getBody()) );
//这里我们睡久一点,方便测试
SleepUtils.sleep(30);
//手动进行应答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback= consumerTag -> System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
System.out.println("C2等待接受信息");
//将第二次参数autoAck改为false,表示不自动应答
channel.basicConsume(QUEUE_NAME, false,deliverCallback,cancelCallback );
}
}
开启生产者线程,输入消息,中间关闭消费者2,观察原属于消费者2的处理的消息是否会由消费者1来执行
三、RabbitMQ 持久化 3.1、概念3.2、队列如何实现持久化前面我们学习了如何解决消息丢失,现在又有一个问题,如果你的RabibtMQ服务器挂掉了,在其中的队列和消息都会消失,我们该如何解决这个问题呢?
//我们只需要在声明队列的时在第二个参数加上true,即可实现队列的持久化 channel.queueDeclare(QUEUE_NAME, true, false, false, null);
注意:我们在声明持久化队列时,如果该队列已存在则会报错,这时我们把原先的队列删了即可
3.3、消息实现持久化//我们只需要在发布消息时在第三个参数加上MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",QUEUE_NAME , MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
3.4、不公平分发
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,而有些消费者处理消息很多,有些则很慢,这就会造成资源的浪费
为了避免这种情况,我们可以设置参数 channel.basicQos(1);
channel.basicQos(1);
可以理解为有两个工人,在之前分着做,现在是抢着做.我做完一个我就做下一个
3.5、预取值//当值大于1时,则为设置预取值 channel.basicQos(2);
预取值可以理解为信道的容量吧
例:有两个仓库(信道),一个面积(预取值)可容纳卡车(消息)5辆,另一个为2辆,卡车根据面积是否可容纳进行选择仓库.
也可以这样理解,信道可同时持有消息的数量



