Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
2. 案例Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多个消费者进行多个消费者同时消费消息的测试。
1)生产者编写往 rabbitmq 发送 10 条消息。
package com.lijw.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/test"); //虚拟机 默认值 /
factory.setUsername("libai"); // 用户名 默认 guest
factory.setPassword("libai"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5.创建队列 Queue
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues", true, false, false, null);
//6. 发送消息 channel.basicPublish
for (int i = 0; i < 10; i++) {
String body = "hello rabbitmq~~~~ " + i;
channel.basicPublish("", "work_queues", null, body.getBytes());
}
//7. 释放资源
channel.close();
connection.close();
}
}
2)消费者1
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/test"); //虚拟机 默认值 /
factory.setUsername("libai"); // 用户名 默认 guest
factory.setPassword("libai"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
//如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues", true, false, false, null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收队列的数据 body: " + new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//不需要关闭资源,因为消费者需要持续监听队列信息
}
}
3)消费者2
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/test"); //虚拟机 默认值 /
factory.setUsername("libai"); // 用户名 默认 guest
factory.setPassword("libai"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
//如果没有一个名字叫work_queues的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues", true, false, false, null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收队列的数据 body: " + new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//不需要关闭资源,因为消费者需要持续监听队列信息
}
}
3. 测试
启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
消费者1收到的消息:
消费者2收到的消息
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。



