一.生产者发送消息我们知道在rocketmq的broker中可以设置autoCreateTopicEnable,当autoCreateTopicEnable = true的时候,如果生产者往broker中发送消息并指定了一个broker中不存在的topic,那么也是可以发送成功的,因为broker会自动地去创建这个不存在的topic,下面我们就来看一下rocketmq中是如何实现的
问题引出:在生产者发送消息之后,首先它需要知道消息要发送的broker地址,而broker的地址属于路由信息的一部分,路由信息是需要生产者根据topic从NameServer中获取的,生产者默认开启了一个定时任务每30s就去NameServer获取到所有的topic路由信息缓存在本地,但是生产者要发送的topic并不在broker集群中存在的话,那么也就不能拿到对应的topic路由信息了,那此时又是如何确定要发送到哪个broker上?
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
我们来到生产者实现类的发送消息方法sendDefaultImpl,可以看到红色框住的第一句代码,这句代码就是从本地或者NameServer中获取要发送的topic的路由信息,如果成功获取到该topic的路由信息,下面就需要去选择一个MessageQueue,也就是队列了,队列里面有具体所属的broker地址,拿到broker地址之后,后面就可以进行消息的发送了。而重点就在于对于不存在的topic,第一句代码到底是如何拿到路由信息的?我们去这个方法看一下
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
先去生产者的本地缓存中寻找是否有这个topic的路由信息,如果有就返回,否则就从NameServer中获取最新的topic路由信息并且更新到生产者的本地缓存中,然后再从本地缓存中去获取这个topic的路由信息,如果此时还是找不到,那么就从NameServer中获取一个主题名称为TWB102的路由信息,这个TWB102的topic是什么?我们也从来没有去创建过这个topic呀,它是怎么来的呢?其实它就是在broker启动的时候由系统默认创建的系统主题之一(总共9个默认的系统主题),那么我们就需要看下broker的启动的时候这一块做了什么了
org.apache.rocketmq.broker.BrokerController#BrokerController
org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager(org.apache.rocketmq.broker.BrokerController)
在broker启动的时候,会去初始化各种组件,其中就包括了管理topic配置的组件TopicConfigManager,在TopicConfigManager的构造方法中,会往topicConfigTable表中加入9个默认的TopicConfig,其中会判断当前broker是否开启了自动创建topic的配置,如果开启了就创建一个topic名称为TWB102的主题,这个主题就是我们刚才获取到的,并且这个topic的队列数量等于8
我们回到从NameServer中获取最新的路由信息的方法
当本地和NameServer中都没有这个topic的路由信息的时候,就还会调用获取最新路由信息的这个方法,只是isDefault这个参数传的是true,当这个参数是true的时候,就会进去到第一个if条件判断,而里面做的就是去获取topic名称为TWB102的路由信息。上面我们看到broker启动的时候是向NameServer注册了TWB102这个topic的路由信息的,所以这里是可以获取到的,需要注意的是,当获取到了TWB102的主题路由信息之后,就会把队列数量由8改成了4
这时候获取到了topic路由信息就可以去选择队列去发送消息了
默认从所有的topic队列中轮询选取一个队列进行消息的发送,每一个队列中都标记了自己所属的broker地址,拿到broker地址之后,再调用网络层的api就可以完成消息发送了,我们下面去到broker端看对应的逻辑
二.broker处理消息org.apache.rocketmq.broker.processor.SendMessageProcessor#preSend
org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#msgCheck
broker处理生产者的消息发送逻辑是在SendMessageProcessor这个处理器上面做的,一开始会先对消息进行检查里面的信息是否规范,其中就来到msgCheck方法,在msgCheck方法中会对消息的topic找到对应的topic配置信息,如果找不到,那么就说明这个topic不存在,然后就会去创建这个topic的配置信息
org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethod
根据消息的topic名称,队列数(这个队列数上面我们已经看到是4),权限等构建出一个TopicConfig对象并放到topicConfigTable表中,注册定时任务每一次会从topicConfigTable中拿到所有的TopicConfig向NameServer注册路由信息,那么NameServer中就拿到了新创建的topic路由信息了,下一次生产者再向这个topic发送消息的时候就可以直接从本地或者NameServer中拿到了
为什么实际开发中不推荐开启autoCreateTopicEnable?上面我们已经可以知道rocketmq是如何自动创建topic了,但是在实际的使用中我们并不推荐autoCreateTopicEnable开启,为什么呢?这是因为由于在创建新topic的时候生产者是选择了TWB102这个topic的路由信息的其中一个broker去发送消息然后由这个broker去创建新的topic,也就是说这个topic路由信息只包含创建它的这个broker组的地址信息,并没有其他broker组的路由信息,所有当生产者后面继续发送消息的时候,从NameServer中只能获取到这个topic的一个broker组的路由信息,那么这样这个topic就会产生数据倾斜



