Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
图解:
-
红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
-
黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配
需求:
-
生产者:创建 topic 类型的 Exchange,以及绑定两个队列
也就是说 队列2 可以接收 item.insert、item.update、item.delete 三种 routing key 消息,而 队列1 只能接收 item.insert、item.update 两种 routing key 消息。
-
设置 队列1 绑定发送 item.insert 、item.update 两种 routing key 的消息
-
设置 队列2 绑定发送 item.* 的通配符 routing key 的消息
-
1)生产者
使用topic类型的Exchange,发送消息的routing key有3种:item.insert、item.update、item.delete:
package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_Topic {
//交换机名称
static final String TOPIC_EXCHAGE = "topic_exchange";
//队列名称
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//队列名称
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/test"); //虚拟机 默认值 /
factory.setUsername("libai"); // 用户名 默认 guest
factory.setPassword("libai"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建交换机
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC, true, false, false, null);
// 6.声明(创建)队列
channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
// 7. 绑定队列和交换机
// 7.1 队列1 设置 item.insert item.update 的 routing key
channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.insert");
channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHAGE, "item.update");
// 7.2 队列2 设置 item.* 的 routing key
channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHAGE, "item.*");
//8. 发送消息至交换机,由交换机分发消息
// 发送信息
String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息
message = "修改了商品。Topic模式;routing key 为 item.update" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息
message = "删除了商品。Topic模式;routing key 为 item.delete" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
System.out.println("已发送消息:" + message);
//9. 释放资源
channel.close();
connection.close();
}
}
执行如下:
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 topic_exchange 的交换机,可以查看到如下的绑定:
可以看到交换机与队列的绑定规则。下面我们进入队列看看接收到的消息,如下:
2)消费者1
接收两种类型的消息:新增商品和更新商品
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topic1 {
//队列名称
static final String TOPIC_QUEUE_1 = "topic_queue_1";
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/test"); //虚拟机 默认值 /
factory.setUsername("libai"); // 用户名 默认 guest
factory.setPassword("libai"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收队列的数据 body: " + new String(body));
}
};
channel.basicConsume(TOPIC_QUEUE_1,true,consumer);
//不需要关闭资源,因为消费者需要持续监听队列信息
}
}
3)消费者2
接收所有类型的消息:新增商品,更新商品和删除商品。
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topic2 {
//队列名称
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); // ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/test"); //虚拟机 默认值 /
factory.setUsername("libai"); // 用户名 默认 guest
factory.setPassword("libai"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收队列的数据 body: " + new String(body));
}
};
channel.basicConsume(TOPIC_QUEUE_2,true,consumer);
//不需要关闭资源,因为消费者需要持续监听队列信息
}
}
3. 测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
-
消费者1:只可以接收 item.insert 和 item.update 消息
-
消费者2:可以接收 item.* 所有通配的消息
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。



