Sentinel 系统自适应限流从整体维度对应用入口流量进行控制,结合应用的 Load、CPU 使用率、总体平均 RT、入口 QPS 和并发线程数等几个维度的监控指标,通过自适应的流控策略,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
Sentinel系统自适应限流参考了TCP BBR实现,根据系统能处理的请求和允许进来的请求来做一个平衡,而不是通过一个系统load来做限流,它最终的目的是在系统不被拖垮的情况下,提高系统吞吐量,而不是load一定要低于某个阈值
系统规则系统保护规则是从应用级别的入口流量进行控制,从单台机器的 load、CPU 使用率、平均 RT、入口 QPS 和并发线程数等几个维度监控应用指标,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量(EntryType.IN),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。
| 指标 | 说明 |
| Load 自适应(仅对 Linux/Unix-like 机器生效) | 系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的 maxQps * minRt 估算得出。设定参考值一般是 CPU cores * 2.5。 |
| CPU usage(1.5.0+ 版本) | 当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。 |
| 平均 RT | 当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。 |
| 并发线程数 | 当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。 |
| 入口 QPS | 当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。 |
我们把系统处理请求的过程想象为一个水管,到来的请求是往这个水管灌水,当系统处理顺畅的时候,请求不需要排队,直接从水管中穿过,这个请求的RT是最短的;反之,当请求堆积的时候,那么处理请求的时间则会变为:排队时间 + 最短处理时间。
- 如果我们能够保证水管里的水量,能够让水顺畅的流动,则不会增加排队的请求;也就是说,这个时候的系统负载不会进一步恶化。
我们用 T 来表示(水管内部的水量),用RT来表示请求的处理时间,用P来表示进来的请求数,那么一个请求从进入水管道到从水管出来,这个水管会存在 P * RT 个请求。换一句话来说,当 T ≈ QPS * Avg(RT) 的时候,我们可以认为系统的处理能力和允许进入的请求个数达到了平衡,系统的负载不会进一步恶化。
接下来的问题是,水管的水位是可以达到了一个平衡点,但是这个平衡点只能保证水管的水位不再继续增高,但是还面临一个问题,就是在达到平衡点之前,这个水管里已经堆积了多少水。如果之前水管的水已经在一个量级了,那么这个时候系统允许通过的水量可能只能缓慢通过,RT会大,之前堆积在水管里的水会滞留;反之,如果之前的水管水位偏低,那么又会浪费了系统的处理能力。
- 当保持入口的流量是水管出来的流量的最大的值的时候,可以最大利用水管的处理能力。
1.自适应限流使用例子
public class SystemGuardDemo {
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
tick();
//初始化规则参数
initSystemRule();
//开启线程执行流量调用
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry entry = null;
try {
entry = SphU.entry("methodA", EntryType.IN);
pass.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (BlockException e1) {
block.incrementAndGet();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
// ignore
}
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
}
});
entryThread.setName("working-thread");
entryThread.start();
}
}
private static void initSystemRule() {
List rules = new ArrayList();
SystemRule rule = new SystemRule();
// max load is 3
rule.setHighestSystemLoad(3.0);
// max cpu usage is 60%
rule.setHighestCpuUsage(0.6);
// max avg rt of all request is 10 ms
rule.setAvgRt(10);
// max total qps is 20
rule.setQps(20);
// max parallel working thread is 10
rule.setMaxThread(10);
rules.add(rule);
SystemRuleManager.loadRules(Collections.singletonList(rule));
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:"
+ oneSecondTotal + ", pass:"
+ oneSecondPass + ", block:" + oneSecondBlock);
if (seconds-- <= 0) {
stop = true;
}
}
System.exit(0);
}
}
}
2.自适应限流入口
2.1 SystemSlot
@SpiOrder(-5000) public class SystemSlot extends AbstractLinkedProcessorSlot{ @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { SystemRuleManager.checkSystem(resourceWrapper);//@1 fireEntry(context, resourceWrapper, node, count, prioritized, args);//@2 } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }
代码@1:entry方法中调用SystemRuleManager.checkSystem方法,这里是自适应限流的关键点。
代码@2:在职责链上继续调用下一个slot节点。
2.1 SystemRuleManager
2.1.1 类图
sentinel自适应限流通过SystemRuleManager类来实现,它里面封装了BBR算法的实现,以及系统指标的采集,接下来我们看下它的类图以及核心属性。
我们来看下SystemRuleManager的一些属性:
//系统最大负载 private static volatile double highestSystemLoad = Double.MAX_VALUE; //CPU使用率,介于[0,1]之间 private static volatile double highestCpuUsage = Double.MAX_VALUE; private static volatile double qps = Double.MAX_VALUE; //最大延迟 private static volatile long maxRt = Long.MAX_VALUE; //最大线程数 private static volatile long maxThread = Long.MAX_VALUE; //采集系统cpu load、cpu使用率的实现。 private static SystemStatusListener statusListener = null;
2.1.2 核心方法源码分析
checkSystem
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
//@1
if (resourceWrapper == null) {
return;
}
//@2
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}
//@3
// for inbound traffic only
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
//@4
// total qps
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
//@5
// total thread
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
//@6
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
//@7
// load. BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
//@8
// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
//bbr算法
private static boolean checkBbr(int currentThread) {
if (currentThread > 1 &&
currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
return false;
}
return true;
}
代码@1:检查资源是否为空,如果为空直接返回。
代码@2:判断系统自适应限流是否开启,未开启直接返回。
代码@3:判断资源的流量是否为入口流量,Sentinel系统自适应限流只对入口流量生效。
代码@4:从Constants.ENTRY_NODE获取当前qps,如果当前qps大于SystemRule配置的阈值,直接抛SystemBlockException异常
代码@5:从Constants.ENTRY_NODE获取当前thread,如果当前thread大于SystemRule配置的阈值,直接抛SystemBlockException 异常
代码@6:从Constants.ENTRY_NODE获取当前avgRT,如果当前avgRT大于SystemRule配置的阈值,直接抛SystemBlockException异常
代码@7:进行bbr算法校验:
- 校验系统负载开关是否打开,当前系统load是否大于配置的系统load,如果都满足则继续校验
- 调用checkBbr方法,之前我们有说过系统通过流量:T ≈ QPS * Avg(RT)时我们可以认为系统的处理能力和允许进入的请求个数达到了平衡,所以checkBbr方法计算的公式以秒为单位:T=QPS*RT/1000。如果当前线程数大于T,则进行拦截。
代码@8:判断当前CPU使用率是否大于SystemRule配置的阈值,如果是则抛出SystemBlockException异常
Constants.ENTRY_NODE
自适应限流使用的是全局的ClusterNode节点,这就是说自适应限流的维度是整个系统。
public final static ClusterNode ENTRY_NODE = new ClusterNode(TOTAL_IN_RESOURCE_NAME, ResourceTypeConstants.COMMON);
系统指标采集运行
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-system-status-record-task", true));
static {
checkSystemStatus.set(false);
statusListener = new SystemStatusListener();
scheduler.scheduleAtFixedRate(statusListener, 0, 1, TimeUnit.SECONDS);
currentProperty.addListener(listener);
}
SystemRuleManager类中定义了ScheduledExecutorService线程池,在静态块里面触发SystemStatusListener类的运行,运行时间是1秒钟一次,这表示Sentinel的自适应保护信息采集为1秒钟采集系统load、cpu信息。
2.2.1系统指标采集源码分析
SystemStatusListener 类图:
该类实现了runnable接口,他通过一个线程每隔一秒执行一次load、cpu usage信息的采集。
2.2.2 源码分析
public class SystemStatusListener implements Runnable {
volatile double currentLoad = -1;
volatile double currentCpuUsage = -1;
volatile String reason = StringUtil.EMPTY;
volatile long processCpuTime = 0;
volatile long processUpTime = 0;
public double getSystemAverageLoad() {
return currentLoad;
}
public double getCpuUsage() {
return currentCpuUsage;
}
@Override
public void run() {
try {
//@1
OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
currentLoad = osBean.getSystemLoadAverage();//@2
double systemCpuUsage = osBean.getSystemCpuLoad();//@3
// calculate process cpu usage to support application running in container environment
RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);
long newProcessCpuTime = osBean.getProcessCpuTime();//@4
long newProcessUpTime = runtimeBean.getUptime();//@5
int cpuCores = osBean.getAvailableProcessors();//@6
long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
.toMillis(newProcessCpuTime - processCpuTime);
long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;//@7
processCpuTime = newProcessCpuTime;
processUpTime = newProcessUpTime;
currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);
if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {
writeSystemStatusLog();
}
} catch (Throwable e) {
RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
}
}
}
代码@1:Sentinel通过jdk:ManagementFactory类获取系统load、cpu等信息。
代码@2:获取当前系统load。
代码@3:获取当前系统cpu usage。
代码@4:获取系统cpu运行时间。
代码@5:获取当前jvm cpu运行时间。
代码@6:获取系统cpu核心数。
代码@7:计算CPU使用率。
总结经过以上源码分析,我们可以得出几点结论:
-
- Sentinel自适应限流原理采用了BBR算法
- Sentinel自适应限流维度是整个系统,当系统负载过大时会触发限流。
- Sentinel自适应限流信息采集指标为1秒钟一次。
- Sentinel采集的指标是操作系统级别的,所以要使用自适应限流的话,建议应用服务器不要部署其他负载很高的服务。
参考文章:
https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81



