栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Sentinel源码解析之限流

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Sentinel源码解析之限流

限流 处理流程

源码追踪

请求进入CommonFilter的doFilter方法调用SphU.entry(target, EntryType.IN);经过层层调用最终调用了CtSph的entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object… args)方法。lookProcessChain(resourceWrapper);方法会通过SlotChainBuilder.build方法生成slot链并且获取slot链,这其中就包括了FlowSlot,DegradeSlot

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
    throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
        // so here init the entry only. No rule checking will be done.
        return new CtEntry(resourceWrapper, null, context);
    }

    if (context == null) {
        // Using default context.
        context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
    }

    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {
        return new CtEntry(resourceWrapper, null, context);
    }
    // 通过SlotChainBuilder.build方法生成slot链并且获取slot链
    ProcessorSlot chain = lookProcessChain(resourceWrapper);

    
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        // slot链开始逐个进行处理
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}
 

entry方法实现包含固定两部分,第一部分为本环节处理的逻辑,第二部分为调用父类fireEntry方法进行判断是否还有next,next的slot不为null,则继续进行调用。此处以FlowSlot为例

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    // 进行流控校验
    checkFlow(resourceWrapper, context, node, count, prioritized);
    // 调用父方法判断是否有next slot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

checkFlow经过调用最终会调用checkFlow方法进行流控校验,checkFlow中根据请求资源名称获取对应的规则,遍历规则进行流控判断,如果达到限流规则门限值则抛出异常。

public void checkFlow(Function> ruleProvider, ResourceWrapper resource,
            Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        // 根据资源名称获取对应的限流规则
        Collection rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                // 遍历规则判断是否能通过
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

    private final Function> ruleProvider = new Function>() {
        @Override
        public Collection apply(String resource) {
            // 流控规则的map
            Map> flowRules = FlowRuleManager.getFlowRuleMap();
            return flowRules.get(resource);
        }
    };

canPassCheck方法判断是否是集群流控,跟据类型选择对应的流控判断方法

public boolean canPassCheck( FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }

    // 判断是否集群
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    // 不是集群则本地校验限流
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

获取规则控制器,分别对应建立流控规则时候的几个模式,其中WarmUpRateLimiterController是隐藏实现,目前sentinel-dashboard中没法选择该模式。

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }

    // 获取规则处理器进行检查判断
    // defaultController->直接拒绝模式,
    // WarmUpController->预热启动模式,
    // RateLimiterController->匀速排队模式
    // WarmUpRateLimiterController->预热+匀速排队模式
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

默认快速失败的实现为DefaultController,其中canPass方法首先通过调用avgUsedTokens方法获取到已有请求数,然后加上当前请求数和设置的门限进行对比,如果已有请求数+当前请求>限流数量时表示已经触发限流规则,返回false。

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    // 获取当前已经请求的数(线程限流获取线程,QPS限流获取当前QPS)
    int curCount = avgUsedTokens(node);
    // 已有请求数+当前请求>限流数量时则为限流 返回false
    if (curCount + acquireCount > count) {
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);

                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    return true;
}

private int avgUsedTokens(Node node) {
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        // 如果是线程限流则获取当前线程数
        // 如果是QPS限流返回当前QPS
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号