Distributing tasks among workers (the competing consumers pattern)
一 轮询模式当生产者生产能力过高时,需要引入多个消费者来消费消息,默认这些消费者采用轮询的方式消费消息,不管消费者的消费速度如何,消费者消费的消息数量贴近于一致。
生产者发送多条消息
package com.tech.rabbitmq.nospring.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.50.134");
factory.setPort(5672);
factory.setVirtualHost("/dev");
factory.setUsername("tech");
factory.setPassword("tech");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//队列名称、是否持久化、是否独占(只能有一个消费者)、是否自动删除(在没有消费者时自动删除)、其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10; i++) {
String message = "Hello World! "+i;
//交换机名称(不写为默认交换机,路由键名称与队列名称一致才能被路由)、路由键名称、配置信息、字节数组
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
有两个消费者,一个消费者是2秒消费一条消息,一个消费者6秒消费一条消息
package com.tech.rabbitmq.nospring.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Recv1 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.50.134");
factory.setPort(5672);
factory.setVirtualHost("/dev");
factory.setUsername("tech");
factory.setPassword("tech");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//一般是固定的,可以作为会话的名称
// System.out.println("consumerTag="+consumerTag);
//可以获取交换机、路由键等信息
// System.out.println("envelope="+envelope);
// System.out.println("properties="+properties);
System.out.println("body="+new String(body,"utf-8"));
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//false 关闭自动确认
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
package com.tech.rabbitmq.nospring.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Recv2 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.50.134");
factory.setPort(5672);
factory.setVirtualHost("/dev");
factory.setUsername("tech");
factory.setPassword("tech");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
e.printStackTrace();
}
//一般是固定的,可以作为会话的名称
// System.out.println("consumerTag="+consumerTag);
//可以获取交换机、路由键等信息
// System.out.println("envelope="+envelope);
// System.out.println("properties="+properties);
System.out.println("body="+new String(body,"utf-8"));
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//false 关闭自动确认
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
启动两个消费者,启动发送者,得到如下结果:
两个消费者消费的消息数一样; RabbitMQ轮询着给各个消费者逐一发送消息到消费者本地,消费者各自处理本地的消息,由于分配的数量是一样的,消费速度快的消费者会优先消费完进入空闲状态二 能者多劳模式
可以通过basicQos设置RabbitMQ每次给消费者发送1条消息,等这条消息手动Ack后才发送下1条消息,这样消费慢的消费者没有ack消息,RabbitMQ不会发送下一条消息给这个消费慢的消费者,而是给到了已经完成消息Ack消费速度快的消费者,实现能者多劳。
接收端需要修改代码
int fetchCount = 1;
//设置RabbitMQ给消费者发送fetchCount条消息后,需要等待有消息确认时,再继续给该消费者发送消息,
// 发送的消息数量为消费者fetchCount-当前未消费者未ack的消息数
//当fetchCount为1时,则RabbitMQ给消费者发送1条消息,等待这条消息ack后才会发下1条消息
channel.basicQos(fetchCount);
完整代码如下:
package com.tech.rabbitmq.nospring.work.qos;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Recv1 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.50.134");
factory.setPort(5672);
factory.setVirtualHost("/dev");
factory.setUsername("tech");
factory.setPassword("tech");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int fetchCount=1;
//设置RabbitMQ给消费者发送fetchCount条消息后,需要等待有消息确认时,再继续给该消费者发送消息,
// 发送的消息数量为消费者fetchCount-当前未消费者未ack的消息数
//当fetchCount为1时,则RabbitMQ给消费者发送1条消息,等待这条消息ack后才会发下1条消息
channel.basicQos(fetchCount);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//一般是固定的,可以作为会话的名称
// System.out.println("consumerTag="+consumerTag);
//可以获取交换机、路由键等信息
// System.out.println("envelope="+envelope);
// System.out.println("properties="+properties);
System.out.println("body="+new String(body,"utf-8"));
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//false 关闭自动确认
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
package com.tech.rabbitmq.nospring.work.qos;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Recv2 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.50.134");
factory.setPort(5672);
factory.setVirtualHost("/dev");
factory.setUsername("tech");
factory.setPassword("tech");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int fetchCount = 1;
//设置RabbitMQ给消费者发送fetchCount条消息后,需要等待有消息确认时,再继续给该消费者发送消息,
// 发送的消息数量为消费者fetchCount-当前未消费者未ack的消息数
//当fetchCount为1时,则RabbitMQ给消费者发送1条消息,等待这条消息ack后才会发下1条消息
channel.basicQos(fetchCount);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//一般是固定的,可以作为会话的名称
// System.out.println("consumerTag="+consumerTag);
//可以获取交换机、路由键等信息
// System.out.println("envelope="+envelope);
// System.out.println("properties="+properties);
System.out.println("body=" + new String(body, "utf-8"));
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//false 关闭自动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}



