栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他

RocketMQ

其他 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ

文章目录
  • 1. RocketMQ基本原理:
    • 1.1 NameServer基本认识
    • 1.2 Broker、Producer、Consumer 与NameServer的通信
    • 1.3 Topic 基本原理
      • 1.3.1 perm 参数的含义
      • 1.3.2 Topic 收发消息原理
  • 2. 使用RocketMQ原生API收发消息代码
    • 2.1 同步消息
    • 2.2 延迟消息
    • 2.3 顺序消息
    • 2.4 分布式事务消息

1. RocketMQ基本原理:

1.1 NameServer基本认识
  1. NameServer主要用于存储Topic,Broker关系信息,功能简单,稳定性高。
  2. 各个NameServer节点之间不相关,不需要通信,单台宕机不影响其它节点。
  3. NameServer集群整体宕机不影响已建立关系的Concumer,Producer,Broker。
1.2 Broker、Producer、Consumer 与NameServer的通信
  1. 每个Borker和所有NameServer保持 长连接,心跳间隔为 30秒 。每次心跳时还会携带当前的Topic信息。当某个Broker 两分钟 之内没有心跳,则认为该Broker 下线 ,并调整内存中与该Broker相关的Topic信息。
  2. Consumer 从 NameServer 获得 Topic 的路由信息,与对应的 Broker 建立长连接。间隔 30秒 发送心跳至Broker。Broker检查若发现某 Consumer 两分钟 内无心跳则认为该Consumer 下线 ,并通知该Consumer所有的消费者集群中的其他实例,触发该消费者集群重新负载均衡。
  3. Producer 与消费者一样,也是从 NameServer 获得 Topic 的路由信息,与对应的 Broker 建立长连接, 30秒 发送一次心跳。Broker 也会认为 两分钟 内没有心跳的 Producer 下线
1.3 Topic 基本原理

在Topic上,读和写队列数量默认一致(默认都为4)

1.3.1 perm 参数的含义

perm 参数是设置队列的读写权限

6 : 同时开启读写
4 : 禁写
2 : 禁读

1.3.2 Topic 收发消息原理

生产者将消息发送到 Topic1 的其中一个 写队列 ,消费者从对应的一个 读队列 接收消息。
生产者的负载均衡:
生产者以 轮询 的方式向所有写队列发送消息,这些队列可能会分布在多个broker实例上。
消费者的负载均衡:
一个 group 中的多个消费者,可以以 负载均衡 的方式来接收消息。
读取队列被均匀分配给这些消费者,它们从指定的队列来接收消息。队列的分配可以采用不同的策略

  • 策略1: AllocateMessageQueueAveragely 平均分配(默认策略)
  • 策略2: AllocateMessageQueueAveragelyByCircle 环形分配
  • 策略3: AllocateMessageQueueConsistentHash 一致性哈希
2. 使用RocketMQ原生API收发消息代码

添加 rocketmq-client 依赖
Topic: 主题相当于是消息的分类, 一类消息使用一个主题
Tag: 相当于是消息的二级分类, 在一个主题下, 可以通过 tag 再对消息进行分类
生产者:

  1. 创建生产者对象
  2. 设置nameserver连接地址:setNamesrvAddr
  3. 封装消息对象,设置Topic和Tag
  4. 设置消息延迟级别(可选)
  5. 发送消息.得到反馈

消费者:

  1. 创建消费者对象,push 和 pull
  2. 设置nameserver连接地址:setNamesrvAddr
  3. 设置订阅消息地址subscribe(topic,tag)
    多个 Tag 可以这样写: TagA || TagB || TagC
    不指定 Tag,或者说接收所有的 Tag,可以写星号: *
  4. 设置消息监听器setMessageListener获取消息
    Concurrently:会启动多个线程处理消息
    Orderly 单线程按照顺序接收消息
    SUCCESS:返回消费成功
    RECONSUME_LATER:稍后(根据延时级别递增)重新消费
2.1 同步消息

同步消息发送要保证强一致性,发到master的消息向slave复制后,才会向生产者发送反馈信息。
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

2.2 延迟消息

消息发送到 Rocketmq 服务器后, 延迟一定时间再向消费者进行投递。
生产者发送消息时,对消息进行延时设置:

msg.setDelayTimeLevel(3);	//3 代表级别而不是一个具体的时间值
2.3 顺序消息

同一组 有序的消息序列,会被发送到 同一个队列 ,按照 FIFO 的方式进行处理
一个队列只允许一个消费者线程接收消息,这样就保证消息按顺序被接收

   
SendResult result = producer.send(message, new MessageQueueSelector() {
     
    @Override
    public MessageQueue select(List mqs, Message msg, Object arg) {
        int index = (int) (orderId % mqs.size());
        return mqs.get(index);
    }
}, orderId);
2.4 分布式事务消息

原理:
RocketMQ 提供了可靠性消息,也叫事务消息。

  1. 生产者先向服务器发送一条"半消息"(半消息不会发送给消费者)
  2. 然后执行本地事务
  3. 事务执行成功,提交消息,然后消息发送给消费者,如果消费者消费失败,可以自动重发(按照延迟消息的级别反复尝试),如果最终仍然失败,还可以交给人工手动处理.
  4. 事务执行失败,回滚消息,消费者不会收到该消息
  5. (提交和回滚针对的是消息)
  6. 当生产者因为某种原因迟迟没有向服务器发送提交或回滚操作时,服务器会每隔一分钟询问生产者事务的状态,称为"回查",无限制次数一直回查.

实现:
生产者:

  1. 这里的生产者为TransactionMQProducer,事务消息
  2. 设置事务监听器:setTransactionListener, producer.setTransactionListener(new TransactionListener()
    1. 重写executeLocalTransaction方法: 执行本地事务然后返回状态
    2. 重写checkLocalTransaction方法: 处理服务器的事务回查
  3. 发送事务消息,参数:消息对象和业务数据,会把两个参数传递到事务监听器的两个方法中.

消费者:
同普通消费者,无特殊写法

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/279550.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号