首先要先了解一下为什么要引入RocketMQ消息队列是什么?
比如公司本身的业务体量很小,所以直接单机一把梭哈都能搞定了,但是后面业务体量不断扩大,采用微服务的设计思想,分布式的部署方式,所以拆分了很多的服务,随着体量的增加以及业务场景越来越复杂了,很多场景单机的技术栈和中间件以及不够用了,而且对系统的友好性也下降了,最后做了很多技术选型的工作,我们决定引入消息队列中间件。
主要功能:异步、削峰、解耦
RocketMQ:
它是一款分布式、队列模型(queue)的消息中间件,是Alibaba自主研发的专业消息中间件,实现了业务消峰、分布式事务的优秀框架。
2.安装工欲善其事必先利其器
- 官网:https://rocketmq.apache.org/
image-20211207192901447.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSxb0pD4-1638880091963)(C:Users王元元AppDataRoamingTyporatypora-user-imagesimage-20211207193256601.png)]
简单来讲,binary是编译好的可以直接使用,source是还没编译过的源代码,需要自行编译。
4.解压
5.配置环境变量
6
ROCKETMQ_HOME="D:rocketmq" NAMESRV_ADDR="localhost:9876"
# 启动 nameserver .binmqnamesrv.cmd # 启动broker .binmqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true3.使用 3.1案例一
-
创建maven quickstart项目
-
添加依赖
org.apache.rocketmq rocketmq-client 4.9.1 -
发送
package com.woniuxy.cloud.simple; import com.woniuxy.cloud.AppConstants; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.Scanner; public class Sender { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { //(1)创建生产者 DefaultMQProducer producer = new DefaultMQProducer("TestSender"); producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR); //(2)启动producer producer.start(); //(3)构建消息并发送 Scanner scanner = new Scanner(System.in); while (true) { System.out.println("请输入要发送的消息"); String smsContent = scanner.next(); if (smsContent.equals("exit")) { //(4)关闭producer producer.shutdown(); } Message msg = new Message(AppConstants.SMS_TOPIC, "user_register", smsContent.getBytes("UTF-8")); //同步发送到RocketMQ SendResult sendResult = producer.send(msg); System.out.println("sendResult:" + sendResult); } } } -
接收
package com.woniuxy.cloud.simple;
import com.woniuxy.cloud.AppConstants;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class Receiver {
public static void main(String[] args) throws MQClientException {
//(1)创建消费者实例
//消费者分组,同一个名字的消费者组成一个集群
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestReceive");
consumer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
//(2)订阅某个主题,收到特定的消息
consumer.subscribe(AppConstants.SMS_TOPIC,"*");
//(3)向MQ注册一个监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msgExt:msgs){
try {
System.out.println("消息内容:"+new String(msgExt.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// (4)启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
3.2发送的三种模式
1.发送-同步确认发送结果
同步发送是指消息发送方发出一条消息后,会在收到服务端响应后才发吓一条的通讯方式。
- 应用场景:此场景应用非常广泛,ex:重要的通知邮件、报名短信通知、营销短信系统等。
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
- 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
对比
他的优缺点是啥 RocketMQ优点:单机吞吐量:十万级
可用性:非常高,分布式架构
消息可靠性:经过参数优化配置,消息可以做到0丢失
功能支持:MQ功能较为完善,还是分布式的,扩展性好
支持10亿级别的消息堆积,不会因为堆积导致性能下降
源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ
RocketMQ缺点:支持的客户端语言不多,目前是java及c++,其中c++不成熟
社区活跃度不是特别活跃那种
没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
消息类型分类:普通消息、顺序消息、延时消息、事务消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
延时消息比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java
付款就取消订单释放库存。
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java



