一、介绍主体模式:二、代码:
sendrevc01 /02 三、测试:
主题模式:可以结合前面4中方式:简单、工作队列、交换机、路由
一、介绍主体模式:从结果可以看出生产者发送了多条设置了路由规则的消息,消费者可以根据具体的路由规则消费对应队列中的消息,而不是所有消费者都可以消费所有消息了。
问题:生产者产生的消息如果场景需求过多需要设置很多路由规则,可不可以减少?
解决:采用topic主题模式
路由key太多,难以管理–>加入通配符(类似正则表达式)
通过案例04看到消息通过交换机Exchange Type以及Routing Key规则,可以将消息路由到指定的队列,也符合在工作中的场景去使用的一种方式,对于RabbitMq 除了 direct 模式外,Mq 同样还提供了 topics 主题模式来对消息进行匹配路由,比如在项目开发中,拿商品模块来说,对于商品的查询功能在对商品进行查询时我们将查询消息路由到查询对应队列,而对于商品的添加、更新、删除等操作我们统一路由到另外一个队列来进行处理,此时采用direct 模式可以实现,但对于维护的队列可能就不太容易进行维护,如果涉及模块很多,此时对应队列数量就很多,此时我们就可以通过 topic 主题模式来对消息路由时进行匹配,通过指定的匹配模式将消息路由到匹配到的队列中进行后续处理。对于routing key匹配模式定义规则举例如下:
routing key为一个句点号 . 分隔的字符串(我们将被句点号 . 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”routing key中可以存在两种特殊字符
* 与 # ,用于做模糊匹配,
* 用于匹配一个单词
# 用于匹配多个单词(可以是零个)
package com.xxxx.topic.send;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
// 定义交换机名称
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
Connection connection = null;
Channel channel = null;
try {
// 通过工厂创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//创建消息
String infoMessage = "普通信息";
String errorMessage = "错误信息";
String warningMessage = "警告信息";
//准备路由
String infoRoutingKey = "info.message.orange";
String errorRoutingKey = "error.rabbit.lazy";
String warningRoutingKey = "orange.warning.message";
// 将产生的消息放入队列
// 发送消息(交换机、队列名称、额外发送消息、消息实体)
// 发送不同的消息,携带了不同的路由key
channel.basicPublish(EXCHANGE_NAME, infoRoutingKey, null, infoMessage.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, errorRoutingKey, null, errorMessage.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, warningRoutingKey, null, warningMessage.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + infoMessage + "'");
System.out.println(" [x] Sent '" + errorMessage + "'");
System.out.println(" [x] Sent '" + warningMessage + "'");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
// 关闭通道
if(null != channel && channel.isOpen()){
channel.close();
}
// 关闭连接
if (null != connection && connection.isOpen()){
connection.close();
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
revc01 /02
package com.xxxx.topic.recv;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv01 {
// 队列名称
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.75.100");
factory.setPort(5672);
factory.setUsername("yeb");
factory.setPassword("yeb");
factory.setVirtualHost("/yeb");
try {
//1. 通过工厂创建连接
Connection connection = factory.newConnection();
//2. 获取通道,创建信道
Channel channel = connection.createChannel();
//3. 绑定交换机:
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//获取队列 (排他队列)
String queueName = channel.queueDeclare().getQueue();
//4. 绑定队列: 只接受队列中的错误信息。
//将队列和交换机进行绑定
String routingKey = "#.message.#";
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");
// 获取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列消费(队列名称、自动回值(当我的消费者收到消息后,告诉队列我收到消息了))
//false:手动去确认消息给队列
channel.basicConsume(queueName, true, deliverCallback, consumerTag
-> {
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
三、测试:
启动了两个revc消费者:查看路由:



