- RabbitMQ 入门到放弃——Day01
- MQ架构的设计原理
- 什么是消息中间件
- 传统的Http请求存在那些缺点
- MQ的应用场景有哪些
- 为什么需要使用MQ?
- 同步发送http请求
- 多线程与MQ方式实现异步
- 多线程处理业务逻辑
- MQ处理业务逻辑
- MQ和多线程之间的区别
- MQ消息中间件的名词
- 主流MQ区别对比
- MQ设计基础知识
- 基于多线程队列简单是实现MQ
- 基于netty实现MQ
- 消息如何保证不丢失
- 生产者投递消息,mq 宕机了如何处理
- Maven依赖
RabbitMQ主要由producer、broker、consumer三部分组成,其中producer(生产者)负责生产消息,broker(中间者)负责存储消息,consumer(消费者)负责消费消息。nameserver,负责注册、保存broker集群的注册信息、用于提供给用户查询broker的队列信息,Producer和Consumer通过NameServer可以知道Broker集群的路由信息,从而进行消息的投递和消费。
什么是消息中间件消息中间件基于队列模型实现异步/同步传输数据
作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
传统的Http请求存在那些缺点-
Http请求基于请求与响应模型,在高并发的情况下,客户端发送大量的请求到我们的服务器端可能会导致服务端处理请求堆积的情况。
-
Tomcat服务器每个请求都会有一个独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多可能会导致Tomcat服务器崩溃。所以一般会在入口用Nginx实现限流,整合服务保护框架。
-
HTTP请求处理业务逻辑如果比较耗时的情况下,客户端会一直等待,阻塞等待过程中会导致客户端超时发生重试策略,有可能引发幂等性等问题。
**注意事项:**接口如果是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该交给多线程或者mq处理。
- 异步发送短信
- 异步发送新人优惠券
- 处理一些比较耗时的操作
可以实现支撑高并发、异步解耦、流量削峰、降低耦合度
同步发送http请求客户端发送请求到达服务器端,服务器端会实现下面操作
- 插入数据 1s
- 发送短信提醒 3s
- 发放优惠券 3s
总共响应时间6s,可能会导致客户端阻塞6s时间,对用户的体验不是很好。
多线程与MQ方式实现异步 多线程处理业务逻辑用户向数据库中插入一条数据之后,单独开启一个线程异步发送短信和优惠券操作。
客户端只需要等待1s时间
优点:适合小项目 实现异步
缺点:对CPU资源消耗较大
MQ处理业务逻辑用户向数据库中插入一条数据之后,向MQ中投递一个消息,MQ服务器端将在消息推送给消息者异步解耦处理发送短信和优惠券。
MQ和多线程之间的区别MQ可以实现异步/解耦/流量削峰;
多线程也可以实现异步,但是消耗CPU资源,没有实现解耦。
MQ消息中间件的名词- Producer 生产者:投递消息到MQ服务器端;
- Consumer 消费者:从MQ服务器端获取消息处理业务逻辑;
- Broker MQ服务器端
- Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题
- Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表
- Message 生产者投递消息报文:json
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | kafka |
|---|---|---|---|---|
| 开发语言 | java | erlang | java | scala |
| 单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
| 时效性 | ms级 | us级 | ms级 | ms级以内 |
| 可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
| 功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
多线程版本MQ;
基于网络通讯版本MQ Netty实现
基于多线程队列简单是实现MQimport com.alibaba.fastjson.JSONObject;
import java.util.concurrent.LinkedBlockingDeque;
public class ThreadMQ {
private static LinkedBlockingDeque broker = new LinkedBlockingDeque<>();
public static void main(String[] args) {
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
JSONObject data = new JSONObject();
data.put("phone", "17315372058");
broker.offer(data);
} catch (Exception e) {
}
}
}
}, "生产者");
producer.start();
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
JSONObject data = broker.poll();
if (data != null) {
System.out.println(Thread.currentThread().getName() + ",获取到数据:" + data.toJSONString());
}
} catch (Exception e) {
}
}
}
}, "消费者");
consumer.start();
}
}
基于netty实现MQ
消费者Netty客户端与NettyServer端MQ服务器端保持长连接,MQ服务器端保存消费者连接。
生产者netty客户端发送请求给nettyServer端MQ服务器端,MQ服务器端在将该消息内容发送给消费者。
生产者投递消息给MQ服务器端,MQ服务器端需要缓存该消息
消息如何保证不丢失-
MQ服务器端
消息持久化到硬盘
-
生产者
消息确认机制
必须确认消息成功写入到硬盘中,才能够认为消息投递成功
-
消费者
必须确认消息消费成功
rabbitmq 中:才会将该消息删除。
rocketmq 或者kafka 中:才会提交offset
- 生产者投递消息会将msg 消息内容记录下来,后期如果发生生产者投递消息失败;
- 可以根据该日志记录实现补偿机制;
- 补偿机制(获取到该msg 日志消息内容实现重试)
com.alibaba fastjson 1.2.62 io.netty netty-all 4.0.23.Final com.alibaba fastjson 1.2.62 org.apache.commons commons-lang3 3.11



