- 能解决模块之间的耦合度高,导致一个模块宕机后,后续全部功能都不能用了的问题,达到解耦的作用。能解决同步通讯的时间成本高的问题,达到异步通讯的作用,提升客户的体验。能解决高并发导致系统压力过大的问题,达到流量削峰的作用,减轻服务器压力。
市面上比较火爆的几款MQ:ActiveMQ,RocketMQ,Kafka,RabbitMQ。对比如下:
- 语言的支持:ActiveMQ,RocketMQ只支持JAVA语言,Kafka,RabbitMQ支持多门语言;效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别的,RabbitMQ是微秒级别的;针对消息丢失,消息重复的问题它们四种都有其各自一套解决方法:RabbitMQ针对消息的持久化和重复性问题都有比较成熟的解决方案;学习成本:RabbitMQ非常简单,简单到令人发指。
RabbitMQ效率高的原因:RabbitMQ是基于erlang开发,erlong有一个特点叫面向并发编程,所以并发能力很强,性能极好。
RabbitMQ最初是由Rabbit公司研发和维护的,最终是在Pivotal公司维护的,springboot就是Pivotal公司研发的。RabbitMQ严格遵守高效消息队列协议——AMQP协议,帮助我们在进程之间传递异步消息。
详细介绍请看
- Publisher:生产者,发布消息到RabbitMQ中的ExchangeConsumer:消费者,监听RabbitMQ中的Queue中的消息Exchange:交换机,和生产者建立连接并接收生产者的消息Queue:队列,Exchange会将消息分发到指定的Queue,Queue和消费者进行交互Routes:路由,交换机以什么样的策略将消息发布到Queue
- 创建Maven项目导入依赖
com.rabbitmq amqp-client 5.9.0 junit junit 4.12 junit junit 4.12 compile
- 创建工具类,连接RabbitMQ
public class RabbitMQClient {
public static Connection getConnection() {
// 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("test");
factory.setPassword("test");
factory.setVirtualHost("/test");
// 创建Connection
Connection connection = null;
try {
connection = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
// 返回Connection
return connection;
}
}
- 创建测试类,连接RabbitMQ
public class Demo1 {
@Test
public void getConnection() throws IOException {
// 获取Connection
Connection connection = RabbitMQClient.getConnection();
// 其他操作
System.in.read();
// 关闭Connection
connection.close();
}
}
连接成功,RabbitMQ控制台会有一个Connection,如下图:
一个生产者,一个默认的交换机,一个队列,一个消费者
- 创建生产者,创建一个channel,发布消息到exchange,指定路由规则,发送到队列
public class Publisher {
@Test
public void publish() throws Exception{
// 1.获取连接Connection
Connection connection = RabbitMQClient.getConnection();
// 2.创建Channel
Channel channel = connection.createChannel();
// 3.发布消息到Exchange,同时指定路由的规则
String msg = "我要成为高级开发工程师!!!";
channel.basicPublish("","HelloWorld",null,msg.getBytes(StandardCharsets.UTF_8));
//Ps: exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息
System.out.println("生产者发布消息成功!");
// 释放资源
channel.close();
connection.close();
}
}
- 创建消费者,创建一个channel,创建一个队列,并且去消费当前队列。
public class Consumer {
@Test
public void consume() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
// 2. 创建channel
Channel channel = connection.createChannel();
// 3. 声明队列-HellWorld
channel.queueDeclare("HelloWorld",true,false,false,null);
// 4. 开启监听Queue,并指定消费者consumer
channel.basicConsume("HelloWorld",true,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到的消息是:" + new String(body,"UTF-8"));
}
});
System.out.println("消费者开始监听消息...");
// 实时监听,保证程序不能停止
System.in.read();
// 5. 释放资源
channel.close();
connection.close();
}
}
RabbitMQ通讯方式二:Work
只需要在消费者端,添加Qos能力以及更改为手动ack,即可让消费者根据自己的能力去消费指定队列中的消息,而不是默认情况下由RabbitMQ平均分配了。
示例:一个生产者,一个默认的交换机,一个队列,两个消费者
- 创建生产者,创建一个channel,发布消息到exchange,指定路由规则,发送到队列
public class Publisher {
@Test
public void publish() throws Exception{
// 1.获取连接Connection
Connection connection = RabbitMQClient.getConnection();
// 2.创建Channel
Channel channel = connection.createChannel();
// 3.发布消息到Exchange,同时指定路由的规则
for (int i = 1; i <= 10; i++) {
String msg = i + ". 我要成为高级开发工程师!!!";
channel.basicPublish("","Work",null,msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("生产者发布消息成功!");
// 释放资源
channel.close();
connection.close();
}
}
- 创建消费者1,创建一个channel,创建一个队列,设置消费能力-Qos,去消费当前队列,并关闭自动ACK,打开手动ACK。
public class Consumer1 {
@Test
public void consume() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
// 2. 创建channel
Channel channel = connection.createChannel();
// 3. 声明队列-Work
channel.queueDeclare("Work", true, false, false, null);
// 指定当前消费者一次消费多少个消息
channel.basicQos(1);
// 指定消费者每次消费多少个消息后,需要将自动ack关闭,消费者需要手动ack
// 4. 开启监听Queue,并指定消费者consumer
channel.basicConsume("Work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接受到的消息是:" + new String(body, "UTF-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
System.out.println("消费者开始监听消息...");
// 实时监听,保证程序不能停止
System.in.read();
// 5. 释放资源
channel.close();
connection.close();
}
}
- 创建消费者2,创建一个channel,创建一个队列,设置消费能力-Qos,去消费当前队列,并关闭自动ACK,打开手动ACK。
public class Consumer2 {
@Test
public void consume() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
// 2. 创建channel
Channel channel = connection.createChannel();
// 3. 声明队列-Work
channel.queueDeclare("Work", true, false, false, null);
// * 指定当前消费者一次消费多少个消息
channel.basicQos(2);
// 指定消费者每次消费多少个消息后,* 需要将自动ack关闭,消费者需要手动ack
// 4. 开启监听Queue,并指定消费者consumer
channel.basicConsume("Work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接受到的消息是:" + new String(body, "UTF-8"));
// * 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
System.out.println("消费者开始监听消息...");
// 实时监听,保证程序不能停止
System.in.read();
// 5. 释放资源
channel.close();
connection.close();
}
}
通讯方式三:Publish/Subscribe
基于Work在生产者中声明Fanout类型的exchange,并且可以将exchange和多个queue绑定在一起,绑定的方式是直接绑定。
示例:一个生产者,一个交换机,两个队列,两个消费者。
- 创建生产者,创建一个Fanout类型的channel,发布消息到exchange,指定路由规则,发送到队列
public class Publisher {
@Test
public void publish() throws Exception {
// 1.获取连接Connection
Connection connection = RabbitMQClient.getConnection();
// 2.创建Channel
Channel channel = connection.createChannel();
// 3.创建exchange - * 绑定某些队列
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
channel.queueBind("pubsub-queue1", "pubsub-exchange", "");
channel.queueBind("pubsub-queue2", "pubsub-exchange", "");
// 4.发布消息到Exchange,同时指定路由的规则
for (int i = 1; i <= 10; i++) {
String msg = i + ". 我要成为高级开发工程师!!!";
channel.basicPublish("pubsub-exchange", "", null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("生产者发布消息成功!");
// 5.释放资源
channel.close();
connection.close();
}
}
- 创建消费者1,创建一个channel,创建一个队列,指定监听的队列,去消费当前队列
public class Consumer1 {
@Test
public void consume() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
// 2. 创建channel
Channel channel = connection.createChannel();
// 3. 声明队列:pubsub-queue1
channel.queueDeclare("pubsub-queue1", true, false, false, null);
// 4. 开启监听Queue,并指定消费者consumer
channel.basicConsume("pubsub-queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号接受到的消息是:" + new String(body, "UTF-8"));
}
});
System.out.println("消费者开始监听消息...");
// 实时监听,保证程序不能停止
System.in.read();
// 5. 释放资源
channel.close();
connection.close();
}
}
- 创建消费者2,创建一个channel,创建一个队列,指定监听的队列,去消费当前队列
public class Consumer2 {
@Test
public void consume() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
// 2. 创建channel
Channel channel = connection.createChannel();
// 3. 声明队列:pubsub-queue2
channel.queueDeclare("pubsub-queue2", true, false, false, null);
// 4. 开启监听Queue,并指定消费者consumer
channel.basicConsume("pubsub-queue2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号接受到的消息是:" + new String(body, "UTF-8"));
}
});
System.out.println("消费者开始监听消息...");
// 实时监听,保证程序不能停止
System.in.read();
// 5. 释放资源
channel.close();
connection.close();
}
}
通讯方式四:Routing
生产者的channel绑定多个队列,并指定RoutingKey,发送消息时指定RoutingKey,就会将消息发送到绑定相同的Routing的队列上,然后消费者监听哪个队列就会收到哪个队列上的消息了。
示例:一个生产者,一个默认交换机,两个队列,三个路由,两个消费者
- 创建生产者,创建一个channel,绑定上两个队列,一个队列指定一个路由,一个队列指定两个路由
public class Publisher {
@Test
public void publish() throws Exception {
// 1.获取连接Connection
Connection connection = RabbitMQClient.getConnection();
// 2.创建Channel
Channel channel = connection.createChannel();
// 3.创建exchange - 绑定某一个队列
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind("routing-queue-error", "routing-exchange", "ERROR");
channel.queueBind("routing-queue-error", "routing-exchange", "DEBUG");
channel.queueBind("routing-queue-info", "routing-exchange", "INFO");
// 4.发布消息到Exchange,同时指定路由的规则
String msg = "我要成为高级开发工程师!!!";
channel.basicPublish("routing-exchange", "INFO", null, msg.getBytes(StandardCharsets.UTF_8));
channel.basicPublish("routing-exchange", "INFO", null, msg.getBytes(StandardCharsets.UTF_8));
channel.basicPublish("routing-exchange", "INFO", null, msg.getBytes(StandardCharsets.UTF_8));
msg = msg + "实现失败!";
channel.basicPublish("routing-exchange", "ERROR", null, msg.getBytes(StandardCharsets.UTF_8));
msg = msg + "这是一个BUG!";
channel.basicPublish("routing-exchange", "DEBUG", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发布消息成功!");
// 5.释放资源
channel.close();
connection.close();
}
}
- 创建消费者1,创建一个channel,绑定其中一个的队列,监听队列,消费消息
public class Consumer1 {
@Test
public void consume() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
// 2. 创建channel
Channel channel = connection.createChannel();
// 3. 声明队列:pubsub-queue1
channel.queueDeclare("routing-queue-error", true, false, false, null);
// 4. 开启监听Queue,并指定消费者consumer
channel.basicConsume("routing-queue-error", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者ERROR接受到的消息是:" + new String(body, "UTF-8"));
}
});
System.out.println("消费者开始监听消息...");
// 实时监听,保证程序不能停止
System.in.read();
// 5. 释放资源
channel.close();
connection.close();
}
}
- 创建消费者2,创建一个channel,绑定另一个的队列,监听队列,消费消息
public class Consumer2 {
@Test
public void consume() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
// 2. 创建channel
Channel channel = connection.createChannel();
// 3. 声明队列:pubsub-queue2
channel.queueDeclare("routing-queue-info", true, false, false, null);
// 4. 开启监听Queue,并指定消费者consumer
channel.basicConsume("routing-queue-info", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者INFO接受到的消息是:" + new String(body, "UTF-8"));
}
});
System.out.println("消费者开始监听消息...");
// 实时监听,保证程序不能停止
System.in.read();
// 5. 释放资源
channel.close();
connection.close();
}
}
通讯方式五:Topic
生产者的channel绑定多个队列,并指定RoutingKey,这里指定RoutingKey可以使用通配符‘#’和占位符‘’,发送消息时指定适配的RoutingKey,就会将消息发送到绑定适配的Routing的队列上,然后消费者监听哪个队列就会收到哪个队列上的消息了。
示例:一个生产者,一个默认交换机,八个队列,使用通配符 ‘#’ 和占位符 ‘’ 的路由,两个消费者
- 创建生产者,创建一个channel,绑定上八个队列,一个队列指定一种路由
public class Publisher {
// 交换机名称
private static final String exchangeName = "topic-exchange";
@Test
public void publish() throws Exception {
// 1.获取连接Connection
Connection connection = RabbitMQClient.getConnection();
// 2.创建Channel
Channel channel = connection.createChannel();
// 3.创建exchange - 绑定某一个队列
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
// topic与Routing不同的是:指定RoutingKey可以使用通配符#,占位符*
channel.queueBind("topic-queue1", exchangeName, "order.*");
channel.queueBind("topic-queue2", exchangeName, "order.#");
channel.queueBind("topic-queue3", exchangeName, "*.order");
channel.queueBind("topic-queue4", exchangeName, "#.order");
channel.queueBind("topic-queue5", exchangeName, "#.order.*");
channel.queueBind("topic-queue6", exchangeName, "*.order.*");
channel.queueBind("topic-queue7", exchangeName, "#.order.#");
channel.queueBind("topic-queue8", exchangeName, "*.order.#");
// 4.发布消息到Exchange,同时指定路由的规则
String msg = "我要成为高级开发工程师!!!";
channel.basicPublish(exchangeName, "order.item", null, msg.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "macbook.order", null, msg.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "macbook.order.item", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发布消息成功!");
// 5.释放资源
channel.close();
connection.close();
}
}
- 创建八个消费者分别监听这个八个队列
public class Consumer1 {
// 队列名称
private static final String queueName = "topic-queue1";
@Test
public void consume() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
// 2. 创建channel
Channel channel = connection.createChannel();
// 3. 声明队列:pubsub-queue1
channel.queueDeclare(queueName, true, false, false, null);
// 4. 开启监听Queue,并指定消费者consumer
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者ERROR接受到的消息是:" + new String(body, "UTF-8"));
}
});
System.out.println("消费者开始监听消息...");
// 实时监听,保证程序不能停止
System.in.read();
// 5. 释放资源
channel.close();
connection.close();
}
}
此处就创建一个,其余七个一样,需要改一下queueName和类名。
测试结果: topic-queue1 收到 1 条消息; topic-queue2 收到 1 条消息; topic-queue3 收到 3 条消息; topic-queue4 收到 2 条消息; topic-queue5 收到 3 条消息; topic-queue6 收到 2 条消息; topic-queue7 收到 3 条消息; topic-queue8 收到 2 条消息;
测试结果出人意料:为什么topic-queue3会收到三条消息?



