请求进入CommonFilter的doFilter方法调用SphU.entry(target, EntryType.IN);经过层层调用最终调用了CtSph的entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object… args)方法。lookProcessChain(resourceWrapper);方法会通过SlotChainBuilder.build方法生成slot链并且获取slot链,这其中就包括了FlowSlot,DegradeSlot
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 通过SlotChainBuilder.build方法生成slot链并且获取slot链
ProcessorSlot
entry方法实现包含固定两部分,第一部分为本环节处理的逻辑,第二部分为调用父类fireEntry方法进行判断是否还有next,next的slot不为null,则继续进行调用。此处以FlowSlot为例
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 进行流控校验
checkFlow(resourceWrapper, context, node, count, prioritized);
// 调用父方法判断是否有next slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
checkFlow经过调用最终会调用checkFlow方法进行流控校验,checkFlow中根据请求资源名称获取对应的规则,遍历规则进行流控判断,如果达到限流规则门限值则抛出异常。
public void checkFlow(Function> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } // 根据资源名称获取对应的限流规则 Collection rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { // 遍历规则判断是否能通过 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } private final Function > ruleProvider = new Function >() { @Override public Collection apply(String resource) { // 流控规则的map Map > flowRules = FlowRuleManager.getFlowRuleMap(); return flowRules.get(resource); } };
canPassCheck方法判断是否是集群流控,跟据类型选择对应的流控判断方法
public boolean canPassCheck( FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// 判断是否集群
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 不是集群则本地校验限流
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
获取规则控制器,分别对应建立流控规则时候的几个模式,其中WarmUpRateLimiterController是隐藏实现,目前sentinel-dashboard中没法选择该模式。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 获取规则处理器进行检查判断
// defaultController->直接拒绝模式,
// WarmUpController->预热启动模式,
// RateLimiterController->匀速排队模式
// WarmUpRateLimiterController->预热+匀速排队模式
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
默认快速失败的实现为DefaultController,其中canPass方法首先通过调用avgUsedTokens方法获取到已有请求数,然后加上当前请求数和设置的门限进行对比,如果已有请求数+当前请求>限流数量时表示已经触发限流规则,返回false。
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 获取当前已经请求的数(线程限流获取线程,QPS限流获取当前QPS)
int curCount = avgUsedTokens(node);
// 已有请求数+当前请求>限流数量时则为限流 返回false
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
// 如果是线程限流则获取当前线程数
// 如果是QPS限流返回当前QPS
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}



