既然要说rabbitmq,那肯定首先要安装上了!
安装rabbitmq教程
最终项目放到了gitee上了,gitee地址
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
- 为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
开发中消息队列通常有如下应用场景:
- 任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
- 应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
AMQR和JMS
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
AMQPJMS全称Advanced Message Queuing Protocol
AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP 与 JMS 区别
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。 JMS规定了两种消息模式;而AMQP的消息模式更加丰富
消息队列产品
市场上常见的消息队列有如下:
- ActiveMQ:基于JMS
- ZeroMQ:基于C语言开发
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品
- Kafka:类似MQ的产品;分布式消息系统,高吞吐量
RabbitMQ
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ提供了7种模式:
- 简单模式,
- work模式,
- Publish/Subscribe发布与订阅模式,
- Routing路由模式,
- Topics
- 主题模式,
- RPC远程调用模式(远程调用,不太算MQ;不作介绍);
官网对应模式介绍
二、用户以及Virtual Hosts配置
访问http://localhost:15672/
用户角色
- 进入用户界面
角色说明:
- 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。 - 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) - 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。 - 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。 - 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual
Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。
exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
简单来说就是盘符目录一样。
1. 创建Virtual Hosts2. 设置Virtual Hosts权限
点击进入
设置用户权限
三、RabbitMQ基础案例 创建工程
过程略(建议是springboot,后面会用到)
添加依赖写一个工具类org.springframework.boot spring-boot-starter-parent 2.1.6.RELEASE org.springframework.boot spring-boot-starter-amqp
获得rabbitmq连接对象
package cn.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
//端口号
private static int port = 5672;
//ip地址
private static String host = "127.0.0.1";
//虚拟目录
private static String virtualHost="/blog";
//用户名(注意该用户必须有上面虚拟目录的对应的权限)
private static String username="lxl001";
//密码
private static String password="123";
public static Connection getConnection(){
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
Connection connection= null;
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
简单模式案例
简单模式概述
编写消息生产者和消费者简单模式:
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
在rabbitMQ中消费者是一定要到某个消息队列中去获取消息的
编写消息生产者
package cn.rabbitmq.simple;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class SimpleProducer {
static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) {
try {
//创建一个连接
Connection connection= ConnectionUtil.getConnection();
//创建一个频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
String message="你好,我是简单模式";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息已发送:"+message);
//关闭连接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
编写消息消费者
package cn.rabbitmq.simple;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class SimpleConsumer {
static final String QUEUE_NAME=SimpleProducer.QUEUE_NAME;
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行
首先运行消息生产者
如下:
— 然后我们打开rabbitmq的可视化界面查看
— 接着我们运行消息的消费者
可以看到接收到了生产者发送的消息。
然后再次查看可视化界面
探讨 如何通过可视化界面查看生产者的消息
首先停掉刚才的消费者服务,因为该服务一直处于监听状态下,不会接收到一条消息便停止服务,图示如下:
关掉它!
首先我们再次发送一个消息,如下:
打开可视化界面,点击刚才发送消息的队列名
消息信息内容
上面可以看到要选择Nack,那为什么要选择Nack呢?
ACK和NACK顾名思义:前者是响应,后者是不响应。
先看效果,再解释。
接着上面的运行结果,我们在可视化界面查看信息后,然后我们运行消息的消费者。
运行消费者
发现信息还可以被查看。
然后我们再次关闭消费者服务,然后运行提供者
然后可视化界面,再次查看信息,但是我们这次选择Ack查看
然后再次运行消费者服务
发现消费者这里没看到这条消息,是因为Ack查看将刚才的那条消息做出响应,随后队列会移除该消息。
所以简单的查看信息要选择Nack查看消息,避免队列消费者接收不到该消息导致系统的某个服务未做出对应的响应,造成数据丢失!!!
那么如果一直Nack查看会是怎么样呢?
将消息提供者和消费者服务停止
我们稍加改造一下消息消费者,如下
— 接下来启动提供者服务
启动消费者
因为选择是Nack,未为该消息做出响应,然后我们停止消费者。启动提供者 班主任看小明没回消息,再次发消息提醒
— 启动消费者
此时小明一次收到了两条消息,但是小明还是没有回复,然后关闭消费者服务 班主任发明天上午上课的消息:
消费者再次启动
— 此时小明一次收到了三条消息,然后小明还是没有回复只是看了下, 过了一会,小明觉得还是要回复一下才好,不然班主任一直给他发送这三条(注意是这三条) 然后我们修改消费者服务
然后重启消费者服务
小明接受到三条消息,并Ack响应确认了。 然后关闭消费者服务 启动提供者服务
接着启动消费者服务
此时小明只能接收到Ack响应之后,班主任发送的消息。
工作模式案例 工作模式概述
工作模式
Work Queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
下面来一个大致案例思路:
张三给李四发消息,通过两种方式一个是微信wechat,一个是邮箱email,李四只要有一个方式接收到了,另外一种方式就不再接收到(微信收到,短信就收不到了)
编写消息生产者和消费者
消息生产者(张三):
package cn.rabbitmq.work;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class WorkProducer {
static final String QUEUE_NAME="work_queue";
public static void main(String[] args) {
try {
//创建一个连接
Connection connection= ConnectionUtil.getConnection();
//创建一个频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
String message="你好李四,我是张三,明天有个会要开";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息已发送:"+message);
//关闭资源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
邮箱消费者(李四)
package cn.rabbitmq.work;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class WorkConsumer_email {
static final String QUEUE_NAME=WorkProducer.QUEUE_NAME;
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("邮箱接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
微信消费者(李四)
package cn.rabbitmq.work;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class WorkConsumer_wechat {
static final String QUEUE_NAME=WorkProducer.QUEUE_NAME;
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("微信接收到的消息为:" + new String(body, "utf-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动
启动消息提供者(张三发消息给李四)
启动消息消费者(李四打开微信、邮箱)
发现微信收到消息
邮箱没收到
因为上面的邮箱和微信属于竞争关系,一方拿到该消息,另一方则拿不到。
订阅模式类型解读
订阅模式:
前面2个案例中,只有3个角色:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列(发布订阅模式)
- Direct:定向,把消息交给符合指定routing key 的队列(路由模式)
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列(通配模式)
- Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
总而言之,发布订阅模式、路由模式以及通配模式的不同之处就在于交换机发送给队列的发送条件。
Publish/Subscribe发布与订阅模式 模式说明
发布订阅模式
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息
下面案例大致思路:
小明购买一个商品,要生成一个订单,并且商品的库存减一,因为在购买商品时,要将消息发送给订单微服务以及商品微服务。
消息生产者:
package cn.rabbitmq.ps;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class PsProducer {
//交换机名称
static final String FANOUT_EXCHANGE="fanout_exchange";
//订单微服务队列名称
static final String FANOUT_QUEUE_ORDER="fanout_queue_order";
//产品微服务队列名称
static final String FANOUT_QUEUE_PRODUCT="fanout_queue_product";
public static void main(String[] args) {
try {
//创建一个连接
Connection connection= ConnectionUtil.getConnection();
//创建一个频道
Channel channel=connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_ORDER,true,false,false,null);
channel.queueDeclare(FANOUT_QUEUE_PRODUCT,true,false,false,null);
//将队列绑定到交换机
channel.queueBind(FANOUT_QUEUE_PRODUCT,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE_ORDER,FANOUT_EXCHANGE,"");
//发送消息
String message="小明购买了一个游戏本电脑";
channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
System.out.println("消息已发送:"+message);
//关闭资源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
订单消息消费者
package cn.rabbitmq.ps;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class PsConsumer_order {
//交换机名称
static final String FANOUT_EXCHANGE=PsProducer.FANOUT_EXCHANGE;
//订单微服务队列 名称
static final String FANOUT_QUEUE_ORDER=PsProducer.FANOUT_QUEUE_ORDER;
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_ORDER, true, false, false, null);
// 队列 绑定交换机
channel.queueBind(FANOUT_QUEUE_ORDER, PsProducer.FANOUT_EXCHANGE, "");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者2接收到的消息为:" + new String(body, "utf-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(FANOUT_QUEUE_ORDER, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
商品消息消费者:
package cn.rabbitmq.ps;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class PsConsumer_product {
//交换机名称
static final String FANOUT_EXCHANGE=PsProducer.FANOUT_EXCHANGE;
//队列1名称
static final String FANOUT_QUEUE_PRODUCT=PsProducer.FANOUT_QUEUE_PRODUCT;
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_PRODUCT, true, false, false, null);
// 队列 绑定交换机
channel.queueBind(FANOUT_QUEUE_PRODUCT, PsProducer.FANOUT_EXCHANGE, "");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1接收到的消息为:" + new String(body, "utf-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(FANOUT_QUEUE_PRODUCT, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行
启动消息提供者和消息消费者
可以看到订单消息消费者和商品消息消费者都收到了小明购买商品的这条消息。
Routing路由模式 模式说明
路由模式特点:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的
Routingkey 与消息的 Routing key 完全一致,才会接收到消息
路由模式
图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
下面案例的大致思想:
在业务系统中,一个商品会有许多个业务,其中最重要的就是新增和更新,接下来的案例中会有一个新增的消息队列(insert),一个更新的消息队列(update),绑定到一台交换机上,交换机绑定队列时使用不同的routingkey,发送时指定发送给哪一个key,实现路由。
编写消息生产者和消费者
RoutingProducer
package cn.rabbitmq.routing;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class RoutingProducer {
//交换机名称
static final String DIRECT_EXCHANGE = "direct_exchange_demo1";
//队列名称 更新商品队列名称
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//队列名称 新增商品队列名称
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) {
try {
//创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明(创建)队列
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
//队列绑定交换机
//新增服务 路由 key
String routingKey_insert="insert";
//更新服务 路由 key
String routingKey_update="update";
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHANGE, routingKey_insert);
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHANGE, routingKey_update);
//一、发送消息 新增消息
String message = "新增了商品。路由模式;routing key 为 "+routingKey_insert ;
channel.basicPublish(DIRECT_EXCHANGE,routingKey_insert,null,message.getBytes());
System.out.println("消息已发送:"+message);
//二、发送消息 更新消息
message = "更新了商品。路由模式;routing key 为 "+routingKey_update ;
channel.basicPublish(DIRECT_EXCHANGE,routingKey_update,null,message.getBytes());
System.out.println("消息已发送:"+message);
//关闭资源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
新增商品的消息队列消费者
package cn.rabbitmq.routing;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class RoutingConsumer_insert {
//交换机名称
static final String DIRECT_EXCHANGE = RoutingProducer.DIRECT_EXCHANGE;
//队列名称
static final String DIRECT_QUEUE_INSERT = RoutingProducer.DIRECT_QUEUE_INSERT;
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
//新增服务 路由 key
String routingKey_insert="insert";
// 队列 绑定交换机
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHANGE, routingKey_insert);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("新增商品消费者接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(DIRECT_QUEUE_INSERT, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
更新商品的消息队列的消费者
package cn.rabbitmq.routing;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class RoutingConsumer_update {
//交换机名称
static final String DIRECT_EXCHANGE = RoutingProducer.DIRECT_EXCHANGE;
//队列名称
static final String DIRECT_QUEUE_UPDATE = RoutingProducer.DIRECT_QUEUE_UPDATE;
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
//更新服务 路由 key
String routingKey_update="update";
// 队列 绑定交换机
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHANGE, routingKey_update);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(DIRECT_QUEUE_UPDATE, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动
依次启动消息的提供者和消费者
启动消息的提供者,发送消息
新增商品消息消费者获取消息
更新商品消息消费者获取消息
上述可以看出交换机会根据不同的routingkey路由key来指定绑定在该交换机上的消息队列接收消息,
注意一定是绑定到该交换机上的消息队列
比如
订单也有一个insert的业务,然后订单微服务将insert的路由key的消息队列绑定到了订单交换机上,但是当商品交换机给insert的消息队列发送消息时,订单交换机上的路由key为insert的消息队列取不到消息。(不同交换机是相互隔离的)
小结:
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
Topics通配符模式 模式说明
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# :匹配一个或多个词
* :匹配不多不少恰好1个词
举例:
product.#:能匹配product.insert、product.update、product.insert.count
product.*:能匹配product.insert、product.update
同样
#和*可以用来前面,#.product或者*.product,效果同上述。
模型图:
图解:
- 红色Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
- 黄色Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
编写消息生产者和消费者
消息生产者
package cn.rabbitmq.topic;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class TopicProducer {
//交换机名称
static final String TOPIC_EXCHANGE = "topic_exchange_demo1";
//产品 队列名称
static final String TOPIC_QUEUE_PRODUCT = "topic_queue_product";
//订单 队列名称
static final String TOPIC_QUEUE_ORDER = "topic_queue_order";
public static void main(String[] args) {
try {
//创建连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
// 发送信息 新增订单
String RoutingKey_order_insert="order.insert";
String message = "新增了订单。Topic模式;routing key 为 "+RoutingKey_order_insert ;
channel.basicPublish(TOPIC_EXCHANGE, RoutingKey_order_insert, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息 更新订单
String RoutingKey_order_update="order.update";
message = "更新了订单。Topic模式;routing key 为 "+RoutingKey_order_update ;
channel.basicPublish(TOPIC_EXCHANGE, RoutingKey_order_update, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息 更新订单金额
String RoutingKey_order_update_money="order.update.money";
message = "更新了订单。Topic模式;routing key 为 "+RoutingKey_order_update_money ;
channel.basicPublish(TOPIC_EXCHANGE, RoutingKey_order_update_money, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息 删除商品
String RoutingKey_product_delete="product.delete";
message = "删除了商品。Topic模式;routing key 为 "+RoutingKey_product_delete ;
channel.basicPublish(TOPIC_EXCHANGE, RoutingKey_product_delete, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息 更新商品
String RoutingKey_product_update="product.update";
message = "更新了商品。Topic模式;routing key 为 "+RoutingKey_product_update ;
channel.basicPublish(TOPIC_EXCHANGE, RoutingKey_product_update, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息 更新商品的金额
String RoutingKey_product_update_money="product.update.money";
message = "更新了商品。Topic模式;routing key 为 "+RoutingKey_product_update_money ;
channel.basicPublish(TOPIC_EXCHANGE, RoutingKey_product_update_money, null, message.getBytes());
System.out.println("已发送消息:" + message);
//关闭资源
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
商品消息消费者
package cn.rabbitmq.topic;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicConsumer_product {
//交换机名称
static final String TOPIC_EXCHANGE = "topic_exchange_demo1";
//队列名称
static final String TOPIC_QUEUE_PRODUCT = "topic_queue_product";
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE_PRODUCT, true, false, false, null);
// 队列 绑定交换机
channel.queueBind(TOPIC_QUEUE_PRODUCT, TOPIC_EXCHANGE, "product.#");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("商品接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(TOPIC_QUEUE_PRODUCT, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
订单消息消费者
package cn.rabbitmq.topic;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicConsumer_order {
//交换机名称
static final String TOPIC_EXCHANGE = "topic_exchange_demo1";
//队列名称
static final String TOPIC_QUEUE_ORDER = "topic_queue_order";
public static void main(String[] args) {
try {
Connection connection= ConnectionUtil.getConnection();
//创建频道
Channel channel=connection.createChannel();
//声明交换机
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(TOPIC_QUEUE_ORDER, true, false, false, null);
// 队列 绑定交换机
channel.queueBind(TOPIC_QUEUE_ORDER, TOPIC_EXCHANGE, "order.update");
channel.queueBind(TOPIC_QUEUE_ORDER, TOPIC_EXCHANGE, "order.insert");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("订单接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(TOPIC_QUEUE_ORDER, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行 验证交换机只发送消息,而不存储消息
首先运行消息生产者
然后打开可视化界面
然后分别运行商品、订单消息消费者,发现:
订单消息消费者:
商品消息消费者:
没有任何的消息
注意:在通配模式之前,我们在消息生产者的代码中,我们都是先将消息队列声明,然后根据不同的routingkey绑定在交换机上;
而现在在消息生产者的代码中并没有声明消息队列,更没有绑定消息队列,而是在消息消费者中声明消息队列并绑定通配的路由key;
然后我们执行发送消息,由于该交换机上并没有任何绑定的消息队列,那么刚才发送的6条消息就会被丢弃,之后才启动消息消费者,绑定消息队列。
这就验证了交换机不存储消息,只负责发送消息。
再次运行消息生产者
运行消息生产者:
订单消息消费者接受到两条匹配上的key:
商品消息消费者接收到匹配上的三条消息
可视化界面查看绑定
Spring Boot整合RabbitMQ 简介
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ
尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
- 生产者工程:
- application.yml文件配置RabbitMQ相关信息;
- 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
- 注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
- 消费者工程:
- application.yml文件配置RabbitMQ相关信息
- 创建消息处理类,用于接收队列中的消息并进行处理
大部分内容都在生产者的工程中,消费者工程极简!
搭建消息提供者工程 创建消息提供者项目
过程略(创建一个springboot工程即可)
添加依赖编写启动类org.springframework.boot spring-boot-starter-amqp org.springframework.cloud spring-cloud-stream-binder-rabbit org.springframework.boot spring-boot-starter-test
package cn.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqProductApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqProductApplication.class,args);
}
}
配置消息提供者项目
配置application.yml文件
spring:
rabbitmq:
port: 5672
username: guest
password: guest
host: localhost
virtual-host: /blog
绑定交换机和队列
注意配置注解一定不要忘记加@Configuration
package cn.rabbitmq.provider;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQProductConfig {
//交换机名称
public static final String Blog_EXCHANGE="blog_exchange";
//产品队列名称
public static final String PRODUCT_QUEUE="product_queue";
//订单队列名称
public static final String ORDER_QUEUE="order_queue";
@Bean("blogExchange")
public Exchange blog_exchange(){
//类型为通配模式 持久化
return ExchangeBuilder.topicExchange(Blog_EXCHANGE).durable(true).build();
}
@Bean("productQueue")
public Queue productQueue(){
return QueueBuilder.durable(PRODUCT_QUEUE).build();
}
@Bean("orderQueue")
public Queue orderQueue(){
return QueueBuilder.durable(ORDER_QUEUE).build();
}
@Bean
public Binding productQueueBlogExchangeBind(@Qualifier("blogExchange") Exchange exchange,@Qualifier("productQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("product.*").noargs();
}
@Bean
public Binding orderQueueEbuyExchangeBind(@Qualifier("blogExchange") Exchange exchange,@Qualifier("orderQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("order.*").noargs();
}
}
搭建消息消费者工程
创建消息消费者项目
过程略(创建一个springboot工程即可)
添加依赖org.springframework.boot spring-boot-starter-amqp
编写启动类
package cn.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqConsumerApplication.class,args);
}
}
配置消息消费者项目 配置application.yml文件
spring:
rabbitmq:
port: 5672
username: guest
password: guest
host: localhost
virtual-host: /blog
配置消息监听处理类
package cn.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "product_queue")
public void productListener(String message){
System.out.println(message);
}
@RabbitListener(queues = "order_queue")
public void orderListener(String message){
System.out.println(message);
}
}
测试
在消息生产者的工程中创建一个测试类
package cn.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRabbitmq {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testProductQueue(){
rabbitTemplate.convertAndSend("blog_exchange",
"product.insert", "商品新增,routing key 为product.insert");
}
@Test
public void testOrderQueue(){
rabbitTemplate.convertAndSend("blog_exchange",
"order.update", "订单更新,routing key 为order.update");
}
}
然后运行该测试,然后启动消费者项目,如下图
消费者工程取得消息
至此本文结束!!!



