栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

全站最硬核 百万字强肝RocketMq源码 火热更新中~(二十三)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

全站最硬核 百万字强肝RocketMq源码 火热更新中~(二十三)

进来首先,如果channel不为空,则获取读锁,这里注意RouteInfoManager内部是声明了读写锁的:

private final ReadWriteLock lock = new ReentrantReadWriteLock();

获取锁成功后,接下来从brokerLiveTable(在线的broker表)中遍历出value(BrokerLiveInfo)的channel为目标channel的key作为brokerAddrFound。也就是说第一个if,就是从brokerLiveTable中取出一个key:brokerAddrFound(broker的地址)

整个方法的主要部分都集中在对这个取出的key:brokerAddrFound的处理上。虽然代码很长,但整体逻辑并不复杂,都是删除key为brokerAddrFound的各种表中的元素:(合法化处理还有异常捕获这里就不分析了)

this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);

获取读锁(增删改都是取读锁),先从brokerLiveTable,filterServerTable中移除key为broker地址的元素。这也符合该方法的逻辑:channelDestory。

while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
    BrokerData brokerData = itBrokerAddrTable.next().getValue();

    Iterator> it = brokerData.getBrokerAddrs().entrySet().iterator();
    while (it.hasNext()) {
        Entry entry = it.next();
        Long brokerId = entry.getKey();
        String brokerAddr = entry.getValue();
        if (brokerAddr.equals(brokerAddrFound)) {
            brokerNameFound = brokerData.getBrokerName();
            it.remove();
            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                brokerId, brokerAddr);
            break;
        }
    }

    if (brokerData.getBrokerAddrs().isEmpty()) {
        removeBrokerName = true;
        itBrokerAddrTable.remove();
        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
            brokerData.getBrokerName());
    }
}

从brokerData中移除

if (brokerNameFound != null && removeBrokerName) {
    Iterator>> it = this.clusterAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry> entry = it.next();
        String clusterName = entry.getKey();
        Set brokerNames = entry.getValue();
        boolean removed = brokerNames.remove(brokerNameFound);
        if (removed) {
            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                brokerNameFound, clusterName);

            if (brokerNames.isEmpty()) {
                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                    clusterName);
                it.remove();
            }

            break;
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711811.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号