- RabbitMQ
- 一、首次安装
- 普通部署
- Docker部署
- 二、基本使用
- 2.1 第一个示例工程
- 公共部分
- 生产者
- 消费者
- 效果
- 总结流程
- 2.2 正式介绍
- 2.3 MQ工作模式
- 2.4 交换机类型
- 三、进阶玩法
- 3.1 消息的可靠性投递
- 3.2 Return回退机制
- 3.3 TTL消息超时
- 3.4 死信队列
- 3.5 延迟队列
- 3.6 优先级队列
- 3.7 消息百分百成功投递
- 四、三方框架集成
- 4.1 SpringBoot
- 4.2 SpringCloudStream
- 五、原理分析
- 没写完
- over
下载地址 - Release RabbitMQ 3.6.14 · rabbitmq/rabbitmq-server
# 安装erlang yum -y update yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel yum -y install epel-release yum -y install socat yum -y install erlang # 检查erl版本 erl -version # 安装RabbitMQ - 先去官网下载RPM包 rabbitmq-server-3.6.14-1.el6.noarch.rpm yum install -y net-tools rpm -Uvh rabbitmq-server-3.6.14-1.el6.noarch.rpm # 启动MQ并关闭防火墙 systemctl start rabbitmq-server systemctl status rabbitmq-server iptables -F # Rabbit的默认通讯端口是5672 # 安装Web控制台 rabbitmq-plugins enable rabbitmq_management # 设置初始用户并设置为管理员 rabbitmqctl add_user admin admin rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p / admin “.*” “.*” “.*” # 访问Web控制台 http://192.168.247.177:15672Docker部署
两步完事
docker pull rabbitmq:management docker run -dit --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management二、基本使用 2.1 第一个示例工程 公共部分
依赖导入
org.springframework.boot spring-boot-starter-test 2.3.4.RELEASE junit junit 4.13 test org.springframework.boot spring-boot-starter 2.3.4.RELEASE org.springframework.boot spring-boot-test 2.3.4.RELEASE org.springframework.boot spring-boot-starter-amqp 2.3.4.RELEASE
application.yaml
spring:
rabbitmq:
host: 192.168.247.173
port: 5672
username: admin
password: admin
virtual-host: /
生产者
做常规启动类,然后写个配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "Demo01_direct_exchange";
public static final String QUEUE_NAME = "Demo01_queue";
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
@Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("routingKeyForDemo01").noargs();
}
}
Test发送消息到MQ
import com.ljm.ProducerSpringBootApplication;
import com.ljm.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = ProducerSpringBootApplication.class)
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend01() throws InterruptedException {
rabbitTemplate.convertAndSend
(RabbitMQConfig.EXCHANGE_NAME,
"routingKeyForDemo01",
"is Demo01 Message");
}
}
消费者
做常规启动类,做监听器监听消息,然后运行启动类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbimtMQListener {
@RabbitListener(queues = "Demo01_queue")
public void listenerQueue(Message message){
System.out.println(new String(message.getBody()));
}
}
效果
在消费者上收到消息
总结流程- 做好交换机和队列的绑定关系、交换机和队列的基础配置,可以在提供者上进行设置或走页面配置
- 提供者:指定交换机名称、routingKey进行发送消息
- 消费者:监听指定队列的消息
- 消费者提供者的数据流转在节点的过程,涉及到了序列化,中间件已经帮我们处理
根据上面的配置,相信大家对消息队列应该有个大概的认识,下面概括一下这玩意
首先我们先说一下消息中间件的主要的作用:
[1]异步处理
[2]解耦服务
[3]流量削峰
上面的三点是我们使用消息中间件最主要的目的
什么是QPS,PV , UV , PR
- QPS:每秒查询率,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准
- PV:页面浏览量
- UV:访问某个站点或点击某条新闻的不同IP地址的人数
- PR:即PageRank,网页的级别技术,用来标识网页的等级/重要性。级别从1到10级,10级为满分。PR值越高说明该网页越受欢迎(越重要)
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS
-
AMQP
即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
-
JMS
即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
-
AMQP 与 JMS 区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富
官网说明了有6种工作模式
-
简单模式
一比一的发送,在实际生产没有什么大用处
-
Work queues 工作队列模式
一个提供者,多个消费者,消费者之间对于同一个消息的关系是竞争的关系
对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度,只需要有一个节点成功发送即可
-
Publish/Subscribe 发布/订阅模式
类似于广播,谁订阅了都能收到
-
Routing 路由模式
队列和交换机的routingKey要完全一致才能收到消息
-
Topics 通配符模式
队列和交换机的routingKey模糊匹配一直就能收到消息
-
RPC模式
远程调用,我没用到
-
DIRECT
定向,把消息发送到与这个交换机绑定的队列里面,路由键要求一致
-
FANOUT
扇形(广播),发送消息到每一个与之绑定队列。
-
TOPIC
通配符的方式,其中*用于匹配一个单词,#用于匹配多规格单词(可以为零个)
-
HEADERS
参数匹配,不依赖路由键的匹配规则,而是根据消息内容的headers属性进行匹配,但是比较消耗性能,一般没人用
正常情况下,RabbitMQ提供了两种解决方案
- 事务机制
- 发送方确认
- 自动确认 acknowledge=“none”
- 手动确认 acknowledge=“manual”
一般我们不推荐使用事务机制,因为那样会严重降低MQ的消息吞吐量
3.2 Return回退机制在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到
这个时候我们需要监听这种不可达的消息,就要使用return listener
3.3 TTL消息超时1
3.4 死信队列什么是死信队列
顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列
成为死信队列的三种情况
- 队列消息长度到达限制
- 消费者拒接消费消息
- 原队列存在消息过期设置,消息到达超时时间未被消费
常规的处理方案大概是以下几种
- 丢掉
- 记录死信入库,然后做后续的业务分析或处理
- 通过死信队列,由负责监听死信的应用程序进行处理
从字面意思分析,这条队列是消息发送出去以后,消费者不会即刻得到消息,而是到了指定时间后才能拿到消息进行消费
3.6 优先级队列1
3.7 消息百分百成功投递对于一些重要的消息,我们是希望它必须被消费的,除了宕机这种无法挽救的结局,所以我们需要采取措施
- 生产者:消息信息存入数据库,并将消息记录存到数据库中的消息记录表(表中存在消息投递状态字段status)
- 生产者:使用/confirm/i手动确认的方式发送消息到MQ节点
- 消费者:收到消息,执行回调
- 生产者:收到消息成功被消费的回调,修改消息记录表中的消息状态字段status=1
这里可能会发生以下问题
- 由于网络闪断导致消息未成功发送/收到
- 消息重复消费
解决方案
-
消费未收到
可以通过定时任务进行反复推送确认,并且,可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)
-
消息重复消费
可以采用乐观锁
1
1
4.2 SpringCloudStream1
五、原理分析1
没写完 over


