栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rocketmq生产者发送到哪个队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rocketmq生产者发送到哪个队列

C-DefaultMQProducer
C-DefaultMQProducerImpl method(sendDefaultImpl-sendKernelImpl-)
M-sendKernelImpl
C-MQClientInstance
-MQClientAPIImpl-M(sendMessage-) 构建-RemotingCommand(1.构建请求头 2.填充消息体)
-NettyRemotingClient
-NettyRemotingAbstract-M(invokeSyncImpl);最终发送消息抽象类.组装 RemotingCommand-)

1.找路由,Producer从namesrv获取的到Topic_A路由信息TopicPublishInfo

2.找队列,随机算法

RocketMQ(六)发消息的时候选择queue的算法有哪些? - 苏先生139 - 博客园

3.发消息,NettyRemotingClient NettyRemotingAbstract 通讯原理netty发送

RocketMQ原理解析-producer 2.如何发送消息_斩秋的专栏-CSDN博客_rocketmq发送消息原理

RocketMQ原理学习---生产者普通消息发送_井底之蛙-CSDN博客

第一次发消息,topictable缓存无 TopicPublishInfo 信息,此时客户端要与nameServer交互获取topic对应broker信息,至此知道了发送到哪个broker

TopicPublishInfo 信息

 

ThreadLocalIndex

最终的选择队列算法,random.nextInt(),算一个随机数,用随机数对队列总和取余

ThreadLocal在这里啥意思?随机数,线程隔离

自己测试每次创建producer都是不同线程,每次都是生产新的随机数,显然这样算法不好,实际场景应该是一个生产者线程,或者比较少,取出之前值加1,取到下一个队列。生产者到底几个线程?

public class ThreadLocalIndex {
    private final ThreadLocal threadLocalIndex = new ThreadLocal();
    private final Random random = new Random();

    public int incrementAndGet() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            this.threadLocalIndex.set(index);
        }

        this.threadLocalIndex.set(++index);
        return Math.abs(index);
    }

    @Override
    public String toString() {
        return "ThreadLocalIndex{" +
            "threadLocalIndex=" + threadLocalIndex.get() +
            '}';
    }
}
    public MessageQueue selectoneMessageQueue() {
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

最终发送消息的抽象类

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/606729.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号