- 1.预备操作
- 2.创建工具类,让线程停顿,用来控制程序运行速度
- 3. 编写生产者代码
- 4. 编写消费者代码
- 4.1 编写处理时间较短的消费者代码
- 4.2 编写处理时间较长的消费者代码
- 5. 运行测试代码
2.创建工具类,让线程停顿,用来控制程序运行速度与 RabbitMQ的相关操作2–轮训分发消息中前两步一致,引入相关依赖,并编写创建信道的工具类代码
public class SleepUtils {
public static void sleep(int second){
try {
Thread.sleep(1000*second);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3. 编写生产者代码
public class Task02 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//队列持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.next();
//消息持久化,保存到磁盘中 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("生产者发出消息:" + message);
}
}
}
4. 编写消费者代码
4.1 编写处理时间较短的消费者代码
public class Worker03 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息处理时间较短");
//消息的接收
DeliverCallback deliverCallback = (consumerTag, message) ->{
SleepUtils.sleep(1);
System.out.println("接收到的消息:"+new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//设置不公平分发 等于0的时候为公平
int prefetchCount = 1;
//设置预取值为2 可以积压2条数据
// int prefetchCount = 2;
channel.basicQos(prefetchCount);
//消息接收被取消时执行的内容
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息者取消消费接口回调逻辑");
};
//采用手动
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
4.2 编写处理时间较长的消费者代码
public class Worker04 {
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息处理时间较长");
//消息的接收
DeliverCallback deliverCallback = (consumerTag, message) ->{
SleepUtils.sleep(30);
System.out.println("接收到的消息:"+new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//设置不公平分发
int prefetchCount = 1;
//设置预取值为5 可以积压5条数据
// int prefetchCount = 5;
channel.basicQos(prefetchCount);
//消息接收被取消时执行的内容
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息者取消消费接口回调逻辑");
};
//采用手动
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
5. 运行测试代码
分别运行两个消费者代码
运行生产者代码,并手动输入要发送的消息
查看消费者代码
可以看到设置了预取值之后,变为了不公平分发。当prefetchCount=0时,为公平分发(默认)。



