MQ(Message Queue)消息队列,是一种用来保存消息数据的队列
队列:数据结构的一种,特征为 “先进先出”
1.2何为消息服务器间的业务请求
原始架构: 服务器中的 A 功能需要调用 B 、 C 模块才能完成 微服务架构: 服务器 A 向服务器 B 发送要执行的操作(视为消息) 服务器 A 向 服务器 C 发送 要执行的 操作 (视为消息 ) 1.3MQ作用1.应用解耦(异步消息发送)
MQ基本工作模式
2.快速应用变更维护(异步消息发送)
3.流量削锋(异步消息发送)
1.4MQ优缺点分析优点(作用)
应用 解耦 快速应用变更 维护 流量削 锋缺点
系统 可用性 降低 系统 复杂度 提高 异步消息机制 消息 顺序性 消息丢失 消息 一致性 消息 重复 使用1.5MQ产品介绍
ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术metaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)
解决所有缺点
2.环境搭建 2.1基础概念 2.2安装下载:https://www.apache.org/
3.消息发送 3.1消息发送与接收开发流程 1. 谁 来发? 2. 发给 谁? 3. 怎么 发 ? 4. 发什么? 5. 发的结果是什么? 6. 打扫 战场 3.2单生产者单消费者消息发送
导入RocketMQ客户端坐标
生产者
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.175.130:9876");
//3.1启动发送的服务
producer.start();
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
//3.2发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
//5.关闭连接
producer.shutdown();
注意:关闭服务器防火墙
systemctl stop firewalld.service
消费者
//1.创建一个接收消息的对象Comsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接受的命名服务器地址
consumer.setNamesrvAddr("192.168.175.130:9876");
//3.设定接收消息对应的topic
consumer.subscribe("topic1","*");
//3.开启监听 用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt messageExt : list) {
System.out.println("收到消息:"+messageExt);
System.out.println("消息:"+new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已开启运行");
3.3单生产者多消费者消息发送
消费者
//1.创建一个接收消息的对象Comsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接受的命名服务器地址
consumer.setNamesrvAddr("192.168.175.130:9876");
//3.设定接收消息对应的topic
consumer.subscribe("topic1","*");
//设置当前消费者的消费模式(默认模式 负载均衡)
// consumer.setMessageModel(MessageModel.CLUSTERING);
//设置当前消费者的消费模式为广播模式(所有客户端接收到的消息都是一样的)
consumer.setMessageModel(MessageModel.BROADCASTING);
//3.开启监听 用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt messageExt : list) {
System.out.println("收到消息:"+messageExt);
System.out.println("消息:"+new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已开启运行");
均衡负载
广播模式
3.4多生产者多消费者消息发送多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费
3.5消息类别 3.5.1同步消息特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.175.130:9876");
producer.start();
for (int i=1;i<=5;i++) {
同步消息发送
Message msg = new Message("topic2",("同步消息:hello rocketmq"+i).getBytes("UTF-8"));
SendResult send = producer.send(msg);
System.out.println("返回结果"+send);
}
//添加一个休眠操作,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
3.5.2异步消息
特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息
(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.175.130:9876");
producer.start();
for (int i=1;i<=5;i++) {
//异步消息发送
Message msg = new Message("topic2",("异步消息:hello rocketmq"+i).getBytes("UTF-8"));
producer.send(msg,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
}
//添加一个休眠操作,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
3.5.3单项消息
特征:不需要有回执的消息,例如日志类消息
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.175.130:9876");
producer.start();
for (int i=1;i<=5;i++) {
//单向消息发送
Message msg = new Message("topic2",("单项消息:hello rocketmq"+i).getBytes("UTF-8"));
producer.sendoneway(msg);
}
//添加一个休眠操作,确保异步消息返回后能够输出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
3.5.4延时消息
消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.175.130:9876");
producer.start();
for (int i=1;i<=5;i++) {
//同步消息发送
Message msg = new Message("topic2",("非延时消息:hello rocketmq"+i).getBytes("UTF-8"));
//设置当前消息的延时效果
msg.setDelayTimeLevel(3);
SendResult send = producer.send(msg);
System.out.println("返回结果"+send);
}
producer.shutdown();
}
}
目前支持的消息时间
u 秒级: 1 , 5 , 10 , 30 u 分级: 1~10 , 20 , 30 u 时级: 1 , 21s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
3.5.5批量消息public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.175.130:9876");
producer.start();
//创建一个集合保存多个消息
List msgList = new ArrayList();
Message msg1 = new Message("topic3",("批量消息:hello rocketmq111").getBytes("UTF-8"));
Message msg2 = new Message("topic3",("批量消息:hello rocketmq222").getBytes("UTF-8"));
Message msg3 = new Message("topic3",("批量消息:hello rocketmq333").getBytes("UTF-8"));
msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);
//批量消息发送(每次发送的消息总量不得超过4M)
//消息的总长度包含4个信息:topic,body,消息的属性,日志(20字节)
SendResult send = producer.send(msgList);
System.out.println("返回结果"+send);
producer.shutdown();
}
}
3.5.6消息过滤
分类过滤
生产者
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
消费者
//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
消息过滤
生产者
//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");
消费者
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
注意:SQL过滤需要依赖服务器的功能支持,在broker配置文件中添加对应的功能项,并开启对应功能
enablePropertyFilter=true
启动服务器使启用对应配置文件
3.5.7错乱的消息顺序 3.5.8顺序消息sh mqbroker -n localhost:9876 -c ../conf/broker.conf
发送消息
//设置消息进入到指定的消息队列中
for(final Order order : orderList){
Message msg = new Message("orderTopic",order.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//设置当前消息发送时使用哪一个消息队列
public MessageQueue select(Listlist, Message message, Object o) {
//根据发送的信息不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
接收消息
3.5.9事务消息//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务/ /,转化为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(Listlist, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : list){
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
事务消息状态
提交状态:允许进入队列,此消息与非事务消息无区别
回滚状态:不允许进入队列,此消息等同于未发送过
中间状态:完成了half消息的发送,未对MQ进行二次状态确认
注意:事务消息仅与生产者有关,与消费者无关
public static void main1(String[] args) throws Exception {
//事务消息使用的生产者TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.175.130:9876");
//添加本地事务的监听
producer.setTransactionListener(new TransactionListener() {
//正常的事务过程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.COMMIT_MESSAGE;
}
//事务补偿过程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic7",("事务消息:hello rocketmq").getBytes("UTF-8"));
SendResult send = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果"+send);
producer.shutdown();
}
public static void main2(String[] args) throws Exception {
//事务消息使用的生产者TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.175.130:9876");
//添加本地事务的监听
producer.setTransactionListener(new TransactionListener() {
//正常的事务过程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//事务补偿过程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事务消息:hello rocketmq").getBytes("UTF-8"));
SendResult send = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果"+send);
producer.shutdown();
}
public static void main(String[] args) throws Exception {
//事务消息使用的生产者TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.175.130:9876");
//添加本地事务的监听
producer.setTransactionListener(new TransactionListener() {
//正常的事务过程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.UNKNOW;
}
//事务补偿过程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("事务补偿过程执行");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic9",("事务消息:hello rocketmq").getBytes("UTF-8"));
SendResult send = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果"+send);
//实物补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿
// producer.shutdown();
}
4.集群搭建
4.1RocketMQ集群分类
4.2RocketMQ集群特征
4.3RocketMQ集群工作流程
步骤1:NameServer启动,开启监听,等待broker、producer与consumer连接
步骤2:broker启动,根据配置信息,连接所有的NameServer,并保持长连接
步骤2补充:如果broker中有现存数据, NameServer将保存topic与broker关系
步骤3:producer发信息,连接某个NameServer,并建立长连接
步骤4:producer发消息
u 步骤 4.1 若果 topic 存在,由 NameServer 直接分配 u 步骤 4.2 如果 topic 不存在,由 NameServer 创建 topic 与 broker 关系,并分配步骤5:producer在broker的topic选择一个消息队列(从列表中选择)
步骤6:producer与broker建立长连接,用于发送消息
步骤7:producer发送消息
comsumer工作流程同producer
4.4双主双从集群搭建 5.高级特性 5.1消息的存储 ① 消息 生成者发送 消息到 MQ ② MQ 返回 ACK给 生产者 ③ MQ push 消息给对应的 消费者 ④ 消息消费者返回 ACK 给 MQ说明:ACK(Acknowledge character)
① 消息 生成者发送消息到 MQ ② MQ 收到消息,将消息进行持久化 ,存储该消息 ③ MQ 返回ACK给 生产者 ④ MQ push 消息给对应的消费者 ⑤ 消息消费者返回 ACK 给 MQ ⑥ MQ 删除消息
注意:
① 第⑤步 MQ 在指定 时间 内接到消息消费者返回 ACK ,MQ认定消息 消费成功 ,执行⑥ ② 第⑤步 MQ 在指定时间 内未接到 消息消费者返回 ACK ,MQ认定消息 消费失败,重新执行④⑤⑥
数据库
ActiveMQ 缺点:数据库瓶颈将成为 MQ 瓶颈文件系统
RocketMQ /Kafka/RabbitMQ 解决方案:采用消息 刷 盘机制进行数据存储 缺点:硬盘损坏的问题无法避免5.2高效的消息存储妤读写方式
5.3消息存储结构
MQ数据存储区域包含如下内容
消息数据存储区域 topic queueId message 消费逻辑队列 minOffset maxOffset consumerOffset 索引 key 索引 创建时间索引 ……5.4刷盘机制
同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)
配置方式
5.5高可用性#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
nameserver
无状态 + 全服务器注册消息服务器
主从架构( 2M-2S )消息生产
生产者将相同的 topic 绑定到多个 group 组,保障 master 挂掉后,其他 master 仍可正常进行消息接收消息消费
RocketMQ 自身会根据 master 的压力确认是否由 master 承担消息读取的功能,当 master 繁忙时候,自动切换由 slave 承担数据读取的工作 5.6主从数据复制同步复制
master 接到消息后,先复制到 slave ,然后反馈给生产者写操作成功 优点:数据安全,不丢数据,出现故障容易恢复 缺点:影响数据吞吐量,整体性能低异步复制
master 接到消息后,立即返回给 生产者写操作 成功,当消息达到一定 量 后再异步复制到 slave 优点 :数据吞吐量大,操作延迟低,性能高 缺点:数据不安全,会出现数据丢失的现象,一旦 master 出现故障,从上次数据同步到故障时间的数据将丢失配置方式
5.7负载均衡#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
Producer负载均衡
内部实现了不同 broker 集群中对同一 topic 对应消息队列的负载均衡
Consumer负载均衡
平均分配 循环平均分配 5.8消息重试当消息消费后未正常返回消费成功的信息将启动消息重试机制
消息重试机制
顺序消息 当 消费者消费消息失败后 , RocketMQ 会自动进行 消息重试(每次间隔时间为 1 秒 ) 注意:应用 会出现消息消费被阻塞的 情况,因此,要对顺序消息的消费情况进行监控,避免阻塞 现象的 发生 无序消息死信队列
当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)
死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
死信队列特征
归属某一个组( Gourp Id ),而不归属 Topic ,也不归属消费者 一个死信队列中可以包含同一个组下的多个 Topic 中的死信消息 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化死信队列中消息特征
不会被再次重复消费 死信队列中的消息有效期为 3 天,达到时限后将被 清除死信处理:在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费
5.9消息重复消费
---------------------------------------------------------------------------------------------------------------------------------
内容有部分存在书籍、课堂、网络记录,如有雷同纯属巧合



