本文我们来学习下RabbitMQ的几种工作模式,通过具体的demo实战来体会下RabbitMQ的美妙之处。
工作队列模式 简介工作队列模式(work queue):生产者发送消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。
代码实战新建maven工程,添加依赖:
com.rabbitmq amqp-client 5.9.0
生产者核心代码如下:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// ...省略配置uri的代码factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明一个消息队列
channel.queueDeclare("queue.wq", true, false, false, null);
// 声明direct交换器
channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
// 将消息队列绑定到指定的交换器,并指定绑定键
channel.queueBind("queue.wq", "ex.wq", "key.wq");
for (int i = 0; i < 15; i++) {
channel.basicPublish("ex.wq",
"key.wq", null,
("工作队列:" + i).getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
在生产者服务中,我们设置了一个for循环,往消息队列里发布15次消息。
消费者核心代码:
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//...省略配置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("queue.wq", true,
false, false, null);
channel.basicConsume("queue.wq", new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("推送来的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("Cancel: " + consumerTag);
}
});
}
}
同时启动三个消费者服务,一个生产者服务。
在idea里面同时启动三个服务需做如下配置:
启动成功后,我们进入rabbitmq的管理界面,可以看到我们创建的队列及交换器信息:
执行结果控制台执行效果如下:
可以看到,rabbitmq会将消息通过近似轮询的方式分发给不同的消费者,消费者会独立消费各自获取到的消息。
发布订阅模式 简介在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
生产者将消息发送给交换器。交换器的作用是,从生产者接收消息,将消息推送给消息队列。交换器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。
发布订阅模式使用的是fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。
发布订阅模式是将消息广播给所有订阅该消息的消费者。如图所示:
生产者核心代码:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//省略了设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明fanout类型的交换器
channel.exchangeDeclare("ex.myfan", "fanout", true, false, null);
for (int i = 0; i < 20; i++) {
channel.basicPublish("ex.myfan",
"", // fanout类型的交换器不需要指定路由键
null,
("hello world fan:" + i).getBytes("utf-8"));
}
channel.close();
connection.close();
}
消费者核心代码:
public class OneConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//省略了设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明临时队列,队列的名字由RabbitMQ自动生成
final String queueName = channel.queueDeclare().getQueue();
System.out.println("生成的临时队列的名字为:" + queueName);
channel.exchangeDeclare("ex.myfan",
BuiltinExchangeType.FANOUT,
true,
false,
null);
// fanout类型的交换器绑定不需要routingkey
channel.queueBind(queueName, "ex.myfan", "");
channel.basicConsume(queueName, (consumerTag, message) -> {
System.out.println("One " + new String(message.getBody(), "utf-8"));
}, consumerTag -> {});
}
}
我们创建了一个名叫“ex.myfan”的fanout类型交换器,然后在消费者中将交换器与临时队列进行绑定。
在服务器上执行rabbitmqctl list_bindings --formatter pretty_table命令查看当前rabbitmq中的交换器与消息队列的绑定关系如下:
执行结果此处我们复制三份消费者代码,模拟三个消费者服务进行测试,执行结果如下:
当我们将消费者客户端全部关闭之后,再在服务器上执行查看交换器和队列绑定关系信息的话,如下:
可以看到,队列都不在了,因为是临时的。
查看当前rabbitmq中交换器的状态:
交换器还在,因为是交换器是持久的。
注意:fanout类型的交换器的一个特点是类似于广播,当生产者发送消息的时候,如果消费者已经在线,那它能够收到消息,如果消费者当时不在线的话, 那么它在启动之后是收不到之前生产者发送的消息的。
路由模式 简介使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey ,消费者定义队列并将队列、 routingKey 、Exchange绑定。此时使用 direct 模式Exchagne必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。
现在我们想让接收者只接收部分消息,如,我们通过直接模式的交换器将关键的错误信息记录到log文件,同时在控制台正常打印所有的日志信息。
这就是我们要说的路由模式。
如图所示,我们在消费者一端定义各自所需要接受的消息队列的类型,分别和交换器进行绑定,实现不同消费者消费不同类型的消息。
代码实战生产者核心代码:
public class Producer {
private final static String[] LOG_LEVEL = {
"ERROR",
"FATAL",
"WARN"
};
private static Random random = new Random();
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//此处省略设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明direct类型的交换器,交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.routing", "direct", false, false, null);
for (int i = 0; i < 100; i++) {
String level = LOG_LEVEL[random.nextInt(100) % LOG_LEVEL.length];
channel.basicPublish("ex.routing", level, null, ("这是【" + level + "】的消息").getBytes());
}
}
}
消费者核心代码(此处只展示ErrorConsumer,FatalConsumer和WarnConsumer类似):
public class ErrorConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//此处省略设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare("ex.routing", "direct", false, false, null);
// 此处也可以声明为临时消息队列
channel.queueDeclare("queue.error", false, false, false, null);
channel.queueBind("queue.error", "ex.routing", "ERROR");
channel.basicConsume("queue.error", ((consumerTag, message) -> {
System.out.println("ErrorConsumer收到的消息:" + new String(message.getBody(), "utf-8"));
}), consumerTag -> { });
}
}
查看交换器与消息队列的绑定关系:
队列信息:
执行结果执行结果如下:
主题模式 简介topic,主题模式。使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。
要想 topic 类型的交换器, routingKey 就不能随便写了,它必须得是点分单词。单词可以随便
写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。该
点分单词字符串最长255字节。
bindingKey 也必须是这种形式。 topic 类型的交换器背后原理跟 direct 类型的类似:只要队列
的 bindingKey 的值与消息的 routingKey 匹配,队列就可以收到该消息。有两个不同:
* 匹配一个单词
#匹配0到多个单词
示例:
如下图所示,我们发送描述动物的消息。消息发送的时候指定的 routingKey 包含了三个词,两个点。
第一个单词表示动物的速度,第二个是颜色,第三个是物种:
创建三个绑定:
Q1绑定到" *.orange.* "
Q2绑定到" *.*.rabbit “和” lazy.# "。
描述:
- Q1关注orange颜色动物的消息Q2关注兔子的消息,以及所有懒的动物消息如果不能匹配,就丢弃消息。如果发送的消息 routingKey 是" lazy.orange.male.rabbit ",则会匹配最后一个绑定。
生产者核心代码如下所示:
public class Producer {
private static final String[] LOG_LEVEL = {"info", "error", "warn"};
private static final String[] LOG_AREA = {"beijing", "shanghai", "shenzhen"};
private static final String[] LOG_BIZ = {"edu-online", "biz-online", "emp-online"};
private static final Random RANDOM = new Random();
public static void main(String[] args) throws Exception {
final ConnectionFactory factory = new ConnectionFactory();
//省略uri设置代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare("ex.topic", "topic", true, false, null);
String area, level, biz;
String routingKey, message;
for (int i = 0; i < 100; i++) {
area = LOG_AREA[RANDOM.nextInt(LOG_AREA.length)];
level = LOG_LEVEL[RANDOM.nextInt(LOG_LEVEL.length)];
biz = LOG_BIZ[RANDOM.nextInt(LOG_BIZ.length)];
// routingKey中包含了三个维度
routingKey = area + "." + biz + "." + level;
message = "LOG: [" + level + "] :这是 [" + area + "] 地区 [" + biz + "] 服务器发来的消息,MSG_SEQ = " + i;
channel.basicPublish("ex.topic", routingKey, null, message.getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
消费者代码(部分)如下:
public class BeijingConsumer {
public static void main(String[] args) throws Exception {
final ConnectionFactory factory = new ConnectionFactory();
//省略uri设置代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 临时队列,返回值是服务器为该队列生成的名称
final String queue = channel.queueDeclare().getQueue();
channel.exchangeDeclare("ex.topic", "topic", true, false, null);
// beijing.biz-online.error
// 只要routingKey是以beijing开头的,后面不管几个点分单词,都可以接收
channel.queueBind(queue, "ex.topic", "beijing.#");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "utf-8"));
}, consumerTag -> {});
}
}
执行结果
源码
源码地址:码云demo地址
总结我们主要学习了RabbitMQ的四种工作模式,分别是工作队列模式、发布订阅模式、路由模式、主题模式,他们各自有各自的特点及使用场景。
更多更多我亲身经历的面试真题,还有想要内推大厂的小伙伴可以联系我,请关注微信公众号:【程序员资料站】,回复关键字 “面试” 获取更多面试资料,回复“内推”,我帮你内推大厂。



