栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RocketMQ

RocketMQ

前提条件

1、NameServer已启动。各个NameServer无状态,无通信。
2、broker已启动。

3、broker启动时,加载$HOME/store/config/topics.json里的信息,定时注册到3个NameServer里。

{
	"dataVersion":{
		"counter":3,
		"timestamp":1647074381689
	},
	"topicConfigTable":{
		"SCHEDULE_TOPIC_XXXX":{
			"order":false,
			"perm":6,
			"readQueueNums":18,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"SCHEDULE_TOPIC_XXXX",
			"topicSysFlag":0,
			"writeQueueNums":18
		},
		"SELF_TEST_TOPIC":{
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"SELF_TEST_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"%RETRY%testGroupName":{
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"%RETRY%testGroupName",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"DefaultCluster":{
			"order":false,
			"perm":7,
			"readQueueNums":16,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"DefaultCluster",
			"topicSysFlag":0,
			"writeQueueNums":16
		},
		"DefaultCluster_REPLY_TOPIC":{
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"DefaultCluster_REPLY_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"RMQ_SYS_TRANS_HALF_TOPIC":{
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"RMQ_SYS_TRANS_HALF_TOPIC",
			"topicSysFlag":0,
			"writeQueueNums":1
		},
		"TBW102":{
			"order":false,
			"perm":7,
			"readQueueNums":8,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"TBW102",
			"topicSysFlag":0,
			"writeQueueNums":8
		},
		"BenchmarkTest":{
			"order":false,
			"perm":6,
			"readQueueNums":1024,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"BenchmarkTest",
			"topicSysFlag":0,
			"writeQueueNums":1024
		},
		"OFFSET_MOVED_EVENT":{
			"order":false,
			"perm":6,
			"readQueueNums":1,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"OFFSET_MOVED_EVENT",
			"topicSysFlag":0,
			"writeQueueNums":1
		}
	}
}

4、由于所有NameServer节点里的路由信息都一致,所以producer选择任一一个NameServer与之通信,获得topic路由信息。

Producer发送消息源码分析
DefaultMQProducer producer = new DefaultMQProducer(Constants.TEST_GROUP_NAME);
        producer.setNamesrvAddr(Constants.NAME_SERVER);
        producer.start();
Message msg = new Message(Constants.TEST_TOPIC_NAME,
	Constants.TEST_TAG, "key" + i,(Constants.HELLO_ROCKETMQ + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
producer启动主要流程

sendoneWay

start()方法初始化, sendOneWay拼装、发送消息。

调用tryToFindTopicPublishInfo方法,从topicPublishInfoTable中寻找topic路由信息。

Producer启动的时候,先向NameServer发起请求,询问topic路由信息。
如果找不到,调用updateTopicRouteInfoFromNameServer从NameServer获取topic路由信息。
如果找到了,调用updateTopicRouteInfoFromNameServer更新本地的topic路由信息。

   private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
       TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
       if (null == topicPublishInfo || !topicPublishInfo.ok()) {
           this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
       }

       if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
           return topicPublishInfo;
       } else {
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
           return topicPublishInfo;
       }
   }
topicPublishInfoTable是什么结构?
private final ConcurrentMap topicPublishInfoTable =
        new ConcurrentHashMap();

不同于broker的topic信息,client启动的时候,默认只有一个TBW102的topic。

updateTopicRouteInfoFromNameServer做了什么?

通过getDefaultTopicRouteInfoFromNameServer从NameServer获取topic信息T1,将T和和本地topicRouteTable中的topic(T2)比较,看是否需要更新。
T1结构一览:

  public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
      DefaultMQProducer defaultMQProducer) {
      try {
          if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
              try {
                  TopicRouteData topicRouteData;
                  if (isDefault && defaultMQProducer != null) {
                      topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                          clientConfig.getMqClientApiTimeout());
                      if (topicRouteData != null) {
                          for (QueueData data : topicRouteData.getQueueDatas()) {
                              int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                              data.setReadQueueNums(queueNums);
                              data.setWriteQueueNums(queueNums);
                          }
                      }
                  } else {
                      topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                  }
                  if (topicRouteData != null) {
                      TopicRouteData old = this.topicRouteTable.get(topic);
                      boolean changed = topicRouteDataIsChange(old, topicRouteData);
                      if (!changed) {
                          changed = this.isNeedUpdateTopicRouteInfo(topic);
                      } else {
                          log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                      }

                      if (changed) {
                          TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

                          for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                              this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                          }
						…………
                          log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                          this.topicRouteTable.put(topic, cloneTopicRouteData);
                          return true;
                      }
  }
getTopicRouteInfoFromNameServer做了什么?

封装了一个RequestCode.GET_ROUTEINFO_BY_TOPIC的Netty command。

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
      boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
      GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
      requestHeader.setTopic(topic);
	  //******************look at me plz********************************************
      RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

      RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
      assert response != null;
      switch (response.getCode()) {
          case ResponseCode.TOPIC_NOT_EXIST: {
              if (allowTopicNotExist) {
                  log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
              }

              break;
          }
          case ResponseCode.SUCCESS: {
              byte[] body = response.getBody();
              if (body != null) {
                  return TopicRouteData.decode(body, TopicRouteData.class);
              }
          }
          default:
              break;
      }

      throw new MQClientException(response.getCode(), response.getRemark());
  }

Netty封装通用调用方法

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    //******************look at me plz********************************************
    final Channel channel = this.getAndCreateChannel(addr);
    //**************************************************************
    if (channel != null && channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
            }
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

这2个方法,就是从NameServer的list中获取1个NameServer,如果超时,轮询下一个。

private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
    if (null == addr) {
        return getAndCreateNameserverChannel();
    }

    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }

    return this.createChannel(addr);
}
private Channel getAndCreateNameserverChannel() throws RemotingConnectException, InterruptedException {
      String addr = this.namesrvAddrChoosed.get();
      if (addr != null) {
          ChannelWrapper cw = this.channelTables.get(addr);
          if (cw != null && cw.isOK()) {
              return cw.getChannel();
          }
      }

      final List addrList = this.namesrvAddrList.get();
      if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
          try {
              addr = this.namesrvAddrChoosed.get();
              if (addr != null) {
                  ChannelWrapper cw = this.channelTables.get(addr);
                  if (cw != null && cw.isOK()) {
                      return cw.getChannel();
                  }
              }
    		//******************look at me plz********************************************
              if (addrList != null && !addrList.isEmpty()) {
                  for (int i = 0; i < addrList.size(); i++) {
                      int index = this.namesrvIndex.incrementAndGet();
                      index = Math.abs(index);
                      index = index % addrList.size();
                      String newAddr = addrList.get(index);

                      this.namesrvAddrChoosed.set(newAddr);
                      log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                      Channel channelNew = this.createChannel(newAddr);
                      if (channelNew != null) {
                          return channelNew;
                      }
                  }
                  throw new RemotingConnectException(addrList.toString());
              }
          } finally {
              this.namesrvChannelLock.unlock();
          }
      } else {
          log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
      }

      return null;
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761931.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号