RabbitMQ消息传递模型的核心思想是:生产者的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递到了哪些队列中。
生产者只能将消息发送到交换机(exchange)。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。
交换机必须确切知道如何处理收到的消息:是应该把这些消息放到特定队列,还是说把他们放到许多队列,还是说应该丢弃它们。这就的由交换机的类型来决定。总共有以下几个类型:
直接(direct)主题(topic)标题(headers)扇出(fanout)
默认交换机:在没有指定交换机之前,能实现消息发送的原因,是因为使用的是默认交换机,通过空字符串进行标识。第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中,其实是由routingKey(bindingkey)绑定key指定的,如果它存在的话;
channel.basiPublish("交换机","routingkey",消息属性props,"消息");
没有指定交换机时,routingkey可以是队列名称 2、临时队列
每当连接到RabbitMQ时,都需要一个全新的空队列,为此可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。一旦断开了消费者的连接,临时队列将被自动删除
创建临时队列的方式:channel.queueDeclare().getQueue()
3、Bindings绑定binding 是 exchange 和 queue 之间的桥梁,它告诉exchange和那个队列进行了绑定关系。
比如:下面这张图就是×与Q1和Q2进行了绑定
一个交换机可以通过不同的Routing Key与不同的队列绑定
4、FanoutFanout扇出:交换机将接收到的所有消息 广播 到所有队列中;
交换机的默认类型就是:fanout
扇出类型就是发布订阅模式式
实战
消费者
package com.tuwer.rabbitmq.fanout;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.tuwer.utils.RabbitMqUtils;
import java.io.IOException;
public class ReceiveLog01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 得到通道
Channel channel = mqUtils.getChannel(
"192.168.19.101",
5672,
"admin",
"admin",
"/",
"消费者01(获取日志)");
// 声明交换机:交换机的默认类型就是fanout,可以省略声明
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 获取随机队列
String queueName = channel.queueDeclare().getQueue();
// 队列绑定交换机:由于交换机类型为fanout,可以广播到所有队列,所以不需要routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("消费者01开始接收日志...");
// 确认接收
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
// 未确认接收
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息" + consumerTag + "接收失败!");
};
// 接收消息
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
}
}
生产者
package com.tuwer.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.tuwer.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 得到通道
Channel channel = mqUtils.getChannel(
"192.168.19.101",
5672,
"admin",
"admin",
"/",
"生产者(发送日志)");
try {
// 声明交换机:交换机的默认类型就是fanout,可以省略声明
//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 发送消息
// 消息
String message = "";
// 循环发送消息
for (int i = 1; i < 11; i++) {
message = "Hello World! " + i;
// fanout类型,不需要routingKey
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
message.getBytes());
System.out.println("第" + i + "条消息已发送!");
try {
// 休眠1秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
// 关闭
mqUtils.close();
}
}
}
测试
5、DirectDirect:路由模式;交换机根据RoutingKey向特定队列发送消息
交换机的声明只需要在生产者或消费者一方就可以,适用于其它类型,建议两端都声明,如果先启动的一方中没有声明,将会用默认交换机类型
消费者
// 接收控制台console队列消息:info和warning
public class ReceiveLogDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String QUEUE_NAME = "console";
public static void main(String[] args) throws IOException {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 得到通道
Channel channel = mqUtils.getChannel(
"192.168.19.101",
5672,
"admin",
"admin",
"/",
"控制台获取日志");
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明队列:可以省略,也可获取随机队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 队列绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
System.out.println("控制台开始接收日志...");
// 确认接收
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
// 未确认接收
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息" + consumerTag + "接收失败!");
};
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
// 接收磁盘disk队列消息:error
public class ReceiveLogDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String QUEUE_NAME = "disk";
public static void main(String[] args) throws IOException {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 得到通道
Channel channel = mqUtils.getChannel(
"192.168.19.101",
5672,
"admin",
"admin",
"/",
"磁盘获取日志");
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 队列绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
System.out.println("磁盘开始接收日志...");
// 确认接收
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
// 未确认接收
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息" + consumerTag + "接收失败!");
};
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
生产者:可以向指定的routingkey中发消息
package com.tuwer.rabbitmq.exchange.direct;
import com.rabbitmq.client.Channel;
import com.tuwer.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class EmitLog {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 得到通道
Channel channel = mqUtils.getChannel(
"192.168.19.101",
5672,
"admin",
"admin",
"/",
"生产者(发送日志)");
try {
// 声明交换机
//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 发送消息
// 消息
String message = "";
// 循环发送消息
for (int i = 1; i < 11; i++) {
message = "Hello World! " + i;
channel.basicPublish(
EXCHANGE_NAME,
"error",
null,
message.getBytes());
System.out.println("第" + i + "条消息已发送!");
try {
// 休眠1秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
// 关闭
mqUtils.close();
}
}
}
测试
6、TopicsTopics:主题类型
发送到类型是Topics交换机的消息的RoutingKey不能随意写,必须满足一定的要求,它必须是一个 单词列表,以 点号 分隔开,这些单词可以是任意单词。
比如:“stock.usd.nyse” , “nyse.vmw”,"quick.orange.rabbit"这种类型的。
单词列表最多不能超过255个字节。
在这个规则列表中,其中有两个替换符:
* 代替一个单词 1# 代替零个或多个单词 >=0 2)案例解析
Q1绑定的是:中间带orange的三个单词的字符串:*.orange.*
Q2绑定的是:最后一个单词是rabbit的单个单词:*.*.rabbit,第一个单词是lazy的多个单词:lazy.#
数据接收情况如下:
quick.orange.rabbit:被队列Q1、Q2接收到quick.orange.fox:被队列Q1接收到lazy.brown.fox:被队列Q2接收到lazy.pink.rabbit:虽然满足队列Q2的两个绑定,但是只会被接收一次quick.orange.male.rabbit:四个单词不匹配任何绑定,会被丢弃lazy.orange.male.rabbit:被队列Q2接收到 3)总结
交换机类型是 Topics 的情况下:
当队列绑定键RoutingKey是 #,那么这个队列将接收所有数据,就有点像Fanout了(广播)当队列绑定键RoutingKey当中没有 # 和 * 出现,那么该队列绑定类型就是Direct了(完全匹配) 4)实战
消费者
package com.tuwer.rabbitmq.exchange.topics;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.tuwer.utils.RabbitMqUtils;
import java.io.IOException;
public class ReceiveLogTopics01 {
private static final String EXCHANGE_NAME = "topics_logs";
private static final String QUEUE_NAME = "q1";
public static void main(String[] args) throws IOException {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 得到通道
Channel channel = mqUtils.getChannel(
"192.168.19.101",
5672,
"admin",
"admin",
"/",
"消费者(主题|Q1)");
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明队列:可以省略,也可获取随机队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 队列绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
System.out.println("消费者01开始接收Q1队列消息...");
// 确认接收
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息【" + new String(message.getBody())
+ "】 RoutingKey【" + message.getEnvelope().getRoutingKey() + "】");
};
// 未确认接收
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息" + consumerTag + "接收失败!");
};
// 接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
// 消费者2和消费者1中只有名称和RoutingKey不同
public class ReceiveLogTopics02 {
// ...
private static final String QUEUE_NAME = "q2";
public static void main(String[] args) throws IOException {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 得到通道
Channel channel = mqUtils.getChannel(
"192.168.19.101",
5672,
"admin",
"admin",
"/",
"消费者(主题|Q2)");
// ...
// 队列绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
System.out.println("消费者02开始接收Q2队列消息...");
// ...
}
}
生产者
package com.tuwer.rabbitmq.exchange.topics;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.tuwer.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class EmitLogTopics {
private static final String EXCHANGE_NAME = "topics_logs";
public static void main(String[] args) {
// 工具类
RabbitMqUtils mqUtils = new RabbitMqUtils();
// 得到通道
Channel channel = mqUtils.getChannel(
"192.168.19.101",
5672,
"admin",
"admin",
"/",
"生产者(Topic|发送日志)");
// 要发送的消息及RoutingKey
Map map = new HashMap<>();
map.put("quick.orange.rabbit", "被队列Q1、Q2接收到");
map.put("quick.orange.fox", "被队列Q1接收到");
map.put("lazy.brown.fox", "被队列Q2接收到 ");
map.put("lazy.pink.rabbit", "虽然满足队列Q2的两个绑定,但是只会被接收一次");
map.put("quick.orange.male.rabbit", "四个单词不匹配任何绑定会被丢弃");
map.put("lazy.orange.male.rabbit", "被队列Q2接收到");
try {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = "";
String message = "";
for (String s : map.keySet()) {
routingKey = s;
message = map.get(s);
// 发消息
channel.basicPublish(
EXCHANGE_NAME,
routingKey,
null,
message.getBytes());
System.out.println("【" + message + "】已发送!");
// 休眠1秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭
mqUtils.close();
}
}
}
测试
7、类型区别| 交换机类型 | 匹配RoutingKey | 特点 |
|---|---|---|
| 扇出 Fanout | 不需要匹配;广播转发,可以发送到所有队列 | 不需要匹配 |
| 直接 Direct | 完全匹配;只发到特定的队列 | 完全匹配 |
| 主题 Topic | 选择匹配;按通配符发送到特定的队列 | 选择匹配 |



