- 1. RocketMQ基本原理:
- 1.1 NameServer基本认识
- 1.2 Broker、Producer、Consumer 与NameServer的通信
- 1.3 Topic 基本原理
- 1.3.1 perm 参数的含义
- 1.3.2 Topic 收发消息原理
- 2. 使用RocketMQ原生API收发消息代码
- 2.1 同步消息
- 2.2 延迟消息
- 2.3 顺序消息
- 2.4 分布式事务消息
- NameServer主要用于存储Topic,Broker关系信息,功能简单,稳定性高。
- 各个NameServer节点之间不相关,不需要通信,单台宕机不影响其它节点。
- NameServer集群整体宕机不影响已建立关系的Concumer,Producer,Broker。
- 每个Borker和所有NameServer保持 长连接,心跳间隔为 30秒 。每次心跳时还会携带当前的Topic信息。当某个Broker 两分钟 之内没有心跳,则认为该Broker 下线 ,并调整内存中与该Broker相关的Topic信息。
- Consumer 从 NameServer 获得 Topic 的路由信息,与对应的 Broker 建立长连接。间隔 30秒 发送心跳至Broker。Broker检查若发现某 Consumer 两分钟 内无心跳则认为该Consumer 下线 ,并通知该Consumer所有的消费者集群中的其他实例,触发该消费者集群重新负载均衡。
- Producer 与消费者一样,也是从 NameServer 获得 Topic 的路由信息,与对应的 Broker 建立长连接, 30秒 发送一次心跳。Broker 也会认为 两分钟 内没有心跳的 Producer 下线 。
在Topic上,读和写队列数量默认一致(默认都为4)
1.3.1 perm 参数的含义perm 参数是设置队列的读写权限
1.3.2 Topic 收发消息原理6 : 同时开启读写
4 : 禁写
2 : 禁读
生产者将消息发送到 Topic1 的其中一个 写队列 ,消费者从对应的一个 读队列 接收消息。
生产者的负载均衡:
生产者以 轮询 的方式向所有写队列发送消息,这些队列可能会分布在多个broker实例上。
消费者的负载均衡:
一个 group 中的多个消费者,可以以 负载均衡 的方式来接收消息。
读取队列被均匀分配给这些消费者,它们从指定的队列来接收消息。队列的分配可以采用不同的策略
- 策略1: AllocateMessageQueueAveragely 平均分配(默认策略)
- 策略2: AllocateMessageQueueAveragelyByCircle 环形分配
- 策略3: AllocateMessageQueueConsistentHash 一致性哈希
添加 rocketmq-client 依赖
Topic: 主题相当于是消息的分类, 一类消息使用一个主题
Tag: 相当于是消息的二级分类, 在一个主题下, 可以通过 tag 再对消息进行分类
生产者:
- 创建生产者对象
- 设置nameserver连接地址:setNamesrvAddr
- 封装消息对象,设置Topic和Tag
- 设置消息延迟级别(可选)
- 发送消息.得到反馈
消费者:
- 创建消费者对象,push 和 pull
- 设置nameserver连接地址:setNamesrvAddr
- 设置订阅消息地址subscribe(topic,tag)
多个 Tag 可以这样写: TagA || TagB || TagC
不指定 Tag,或者说接收所有的 Tag,可以写星号: * - 设置消息监听器setMessageListener获取消息
Concurrently:会启动多个线程处理消息
Orderly 单线程按照顺序接收消息
SUCCESS:返回消费成功
RECONSUME_LATER:稍后(根据延时级别递增)重新消费
同步消息发送要保证强一致性,发到master的消息向slave复制后,才会向生产者发送反馈信息。
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
消息发送到 Rocketmq 服务器后, 延迟一定时间再向消费者进行投递。
生产者发送消息时,对消息进行延时设置:
msg.setDelayTimeLevel(3); //3 代表级别而不是一个具体的时间值2.3 顺序消息
同一组 有序的消息序列,会被发送到 同一个队列 ,按照 FIFO 的方式进行处理
一个队列只允许一个消费者线程接收消息,这样就保证消息按顺序被接收
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);
2.4 分布式事务消息
原理:
RocketMQ 提供了可靠性消息,也叫事务消息。
- 生产者先向服务器发送一条"半消息"(半消息不会发送给消费者)
- 然后执行本地事务
- 事务执行成功,提交消息,然后消息发送给消费者,如果消费者消费失败,可以自动重发(按照延迟消息的级别反复尝试),如果最终仍然失败,还可以交给人工手动处理.
- 事务执行失败,回滚消息,消费者不会收到该消息
- (提交和回滚针对的是消息)
- 当生产者因为某种原因迟迟没有向服务器发送提交或回滚操作时,服务器会每隔一分钟询问生产者事务的状态,称为"回查",无限制次数一直回查.
实现:
生产者:
- 这里的生产者为TransactionMQProducer,事务消息
- 设置事务监听器:setTransactionListener, producer.setTransactionListener(new TransactionListener()
- 重写executeLocalTransaction方法: 执行本地事务然后返回状态
- 重写checkLocalTransaction方法: 处理服务器的事务回查
- 发送事务消息,参数:消息对象和业务数据,会把两个参数传递到事务监听器的两个方法中.
消费者:
同普通消费者,无特殊写法



