Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
P:生产者:任务的发布者
C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
C2:消费者-2:领取任务并完成任务,假设完成速度快
1. 开发生产者package com.demo.workquene;
import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection rabbitConnection = RabbitMQUtils.getRabbitConnection();
Channel channel = rabbitConnection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("","work",null,(i+"hello work quene").getBytes());
}
RabbitMQUtils.closeRabbitConnection(channel,rabbitConnection);
}
}
2.开发消费者-1
package com.demo.workquene;
import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getRabbitConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",false,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1 "+new String(body));
}
});
}
}
3.开发消费者-2
跟消费者1的代码几乎一样
4.测试结果
总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
5.消息自动确认机制package com.demo.workquene;
import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getRabbitConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//设置每次接受一个消息
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1 "+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息
}
});
}
}
设置通道一次只能消费一个消息
关闭消息的自动确认,开启手动确认消息
通过线程睡眠来体现能者多劳的消费者,只需要在消费是睡眠1s即可



