栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Rocket mq的一些介绍

Rocket mq的一些介绍

概念 NameServer

相当于服务的注册中心,为整个MQ集群提供服务协调和治理;可以部署多个,但是NameServer节点之间不会有通信,依靠Broker同时向所有的NameServer注册,上报状态信息来达到数据一致,其实也只是保存了Broker和Topic的关系信息数据;每个NameServer节点都保存全量的集群信息,这样就能做到NameServer的高可用; Broker

保存消息的地方,生产者(Producer)会向它发送消息,消费者(Consumer)会从它这里拉取消息;Broker会向NameServer注册自己,并且保持心跳,心跳包里包含自己所有的Topic信息,心跳包是30s发送一次,超时2min没有心跳NameServer就会让其下线。注:这个30s和2min在MQ中基本上是通用的;可以配置主备,可以一主多备,也可以多主,备只有读功能,通过不同的BrokerId来定义,Broker为0表示主,非0则表示备;Broker的主备不会自动切换,也就是说没有选举机制,可以引入Dledger来实现这一机制 Producer

消息的生产者,会向Broker发送消息;它会与某一台NameServer(随机选择)保持长连接,定时(30s)从NameServer获取Topic与Broker的关系信息,但是不会保持心跳;在获取到Topic信息之后,选择其中一台主Broker连接,发送消息,并保持心跳; Consumer

消息的消费者,从Broker拉取消息消费;也是从NameServer拉取Topic信息,然后根据自己订阅的Topic选择主、备Broker,并和它们保持心跳;消费者在向主Broker拉取消息的时候,Broker会根据读取消息、服务器是否可读等因素来建议下一次是从主Broker还是从从Broker拉取消息; Topic

消息的一级分类,像发送短信、发送邮箱这一类区别;Topic可以事先创建,也可以发送消息的时候自动创建,创建Topic的时候,可以指定Broker;一个Topic可以保存在多个Broker上,一个Broker也可以保存多个Topic类型的消息,属于多对多;producer会根据消息的Topic来选择对应的Broker进行发送,consumer也一样; Tag

消息的二级分类,根据具体的业务来分类了,比如注册短信、登录短信等等;只有分类的作用,没有其他特征了; Queue

消息传输的最小单位;如果出现消息积压,就可以增加消费者和Queue来提高消息处理的速度; 流程

启动NameServer,等待Broker注册;Broker启动之后,将自己注册到NameServer,并上报自己的Topic路由信息,随后每30s就给NameServer上报一次;Producer发送消息之前得先知道要往哪台Broker上发,所以会先从本地缓存TopicPublishInfoTable中获取Topic和Broker的路由信息,如果没有的话,就要去NameServer获取路由信息,并且每30s都会去NameServer获取路由信息,更新本地缓存;Producer获取到路由信息之后,会选择一台Broker进行长连接,并保持心跳,然后选择路由信息中的一个队列(MessageQueue)进行消息发送,Broker收到消息之后,会保存并持久化消息;Consumer获取路由信息的过程是一样的,经过客户端的负载均衡后,会选择其中一个或者几个消息队列进行消息消费; 持久化 CommitLog

消息持久化的文件,记录消息主体和元数据的文件;单个文件大小默认为1G,文件名长度为20位,左边补零,剩余为起始偏移量,例如:第一个文件的文件名为00000000000000000000,偏移量为0,文件大小为1G=1073741824,当第一个文件被写满之后,第二个文件的文件名就是00000000001073741824,偏移量为1073741824;当然也可能写不满,在写入一条消息时,会先判断是否足够的空间写入这条消息,如果空间不足的话,会创建新的CommitLog文件;一个Broker实例使用一个日志数据文件(即CommitLog)来存储; ConsumerQueue

逻辑消费队列,是作为消费消息的索引;因为CommitLog中消息是顺序写的,但是Consumer是按照Topic来消费消息的,如果遍历CommitLog文件来获取Comsumer订阅的Topic对应的消息这显然是不合理的,所以就需要记录消息的下标索引,ConsumerQueue就是用来做这个的;记录了消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的hashcode值;ConsumerQueue文件是用条目组成的,一条条目代表一条消息,单个文件包含30W个条目,每条条目是定长的,长度为20个字节,offset是8字节,size是4字节,hashcode是8字节,所以ComsumerQueue可以像数组一样随机访问某一个下标的条目,查询速度很快;ConsumerQueue文件位置在/store/consumequeue下,然后再以Topic命名文件夹,下面再以MessageQueue队列ID命名的文件夹,最后才是一个个文件ConsumerQueue是一个消费队列一个ConsumerQueue文件,看文件存储的方式我们就知道ConsumerQueue文件是一个消费队列维护一个文件,也就是说这个文件里的消息都是一个消费者应该消费的消息; IndexFile

单纯的索引文件,本质上就是一个HashMap;key是Messgae Key或Unique Key,value是消息的物理偏移量offset;提供了可以通过key或者时间区间来查询消息的方式; 页缓存和零拷贝(内存映射)

页缓存(PageCahe):跟mysql中的change buffer概念差不多,就是写的时候先只写到Cache中,不写磁盘,然后通过异步的方式由pdflush内核线程将Cache中的数据刷盘到磁盘;读的时候也会顺序地将其他相邻块的数据文件进行预读取(局部性原理);零拷贝(内存映射):这个概念在其他应用也经常用到,像Netty、Redis什么的。将磁盘上的物理文件直接映射到用户态的内存地址上,这样就可以将对文件的操作转化为直接对内存地址的操作。 消息类型 发送消息

同步消息:会返回一个发送消息的结果,适用于比较重要的消息

SendResult sendResult = producer.send(msg);

异步消息:需要传入一个回调,适用于对响应时间敏感的业务场景

producer.send(msg, new SendCallback());

单向消息:不会告知发送消息结果,适用于不太重要的消息,比如日志消息

producer.sendoneway(msg);
顺序消息

Rocket MQ是做不到全局消息有序的,但是单个Queue是遵循FIFO原则的,所以只要将顺序消息发送给同一到Queue中,这样消费者也是按序去取消息消费

	SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  
                   // 根据订单id选择发送queue
                   // mqs就是所有的Queue,id对Queue长度取模就可以保存同一id被放到同一个Queue中了
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//订单id

延时消息

延时消息就是消费者会延时一段时间才会收到消息,是通过特殊的Topic来实现的(Rocket mq中很多都是通过这个实现的,还有像什么死信队列),一共有18个级别

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

在发送消息的时候设置延迟级别就行了

	Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
    // 设置延时等级3,这个消息将在10s之后发送
    message.setDelayTimeLevel(3);
    producer.send(message);
批量消息

需要消息是同一个Topic,且不能是延迟消息,消息的总大小不应该超过4MB

String topic = "BatchTest";
List messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //处理error
}
事务消息

事务消息只是指Producer发送消息生成事务,也就是说和Consumer没关系;作用:保证发送消息和本地事务(数据库事务)具有原子性,也就是说发送消息异常可以使数据库回滚;过程:
1、发送一个half消息(半消息),对Producer是完整的消息,但是对Consumer是不可见的(修改了Topic,老套路了);
2、得到成功发送的结果之后就开始执行本地事务;
3、通知mq本地事务的执行结果(rollback或者commit);
4、如果第3步的通知消息丢失了,mq也会主动回查这个事务的状态;
5、如果事务状态为commit,mq就会修改1中half消息的Topic,是其对Consumer可见;
6、重点在事务监听接口(TransactionListener)的实现;

public class TransactionListenerImpl implements TransactionListener {
  private AtomicInteger transactionIndex = new AtomicInteger(0);
  private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();

	// 本地事务执行的方法
  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      int value = transactionIndex.getAndIncrement();
      int status = value % 3;
      localTrans.put(msg.getTransactionId(), status);
      return LocalTransactionState.UNKNOW;
  }

// 给mq回结果的方法
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      Integer status = localTrans.get(msg.getTransactionId());
      if (null != status) {
          switch (status) {
              case 0:
                  return LocalTransactionState.UNKNOW;
              case 1:
                  return LocalTransactionState.COMMIT_MESSAGE;
              case 2:
                  return LocalTransactionState.ROLLBACK_MESSAGE;
          }
      }
      return LocalTransactionState.COMMIT_MESSAGE;
  }
}
消息重试和死信队列 消息重试

正常情况下,消息被消费了之后,Consumer会返回一个消费结果ConsumeConcurrentlyStatus.CONSUME_SUCCESS告知Broker这条消息已经被消费了;但是如果消费不成功,比如业务账号问题或者系统服务异常问题,这个时候就可以返回结果ConsumeConcurrentlyStatus.RECONSUME_LATER,表示这条消息稍后要重试;重试的时长间隔,在broker.conf文件中可配置:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

重试的消息会存放在一个Topic为"%RETRY%+consumerGroup",consumerGroup是消费组的组名,这里也可以看出来消息重试针对的是一整个消费组,而不是某一个Topic;重试级别的实现是MQ会先将重试消息先保存到Topic为"SCHEDULE_TOPIC_XXXX"的延迟队列中(这也是延迟队列的实现方式),等到时间了,再将延迟消息保存到topic为"%RETRY%+consumerGroup"的队列中; 消息重投

Producer也有重试机制,当发送消息时:

同步消息失败会重投,retryTimesWhenSendFailed:同步消息的重投次数,默认为2,也就是说Producer最多重投3次,重投时会选择其他Broker发送,超出最大重投次数会抛出异常;异步消息失败会重试,retryTimesWhenSendAsyncFailed:异步消息的重试次数,仅在同一个Broker上重试,不会选择其他Broker;oneway没有任何保证,所以没有重投机制; 死信队列

当消息重试次数大于最大的重试次数且仍然消费失败的时候,mq也不会把这条消息丢弃,而是把它丢进死信队列;死信队列里的消息对Consumer是不可见的,这个时候需要人工进行干预,重新投递这条消息可以让Consumer继续消费这条消息;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/748054.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号