栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > 架构设计

Setinel源码阅读:系统自适应保护

架构设计 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Setinel源码阅读:系统自适应保护

概述

Sentinel 系统自适应限流从整体维度对应用入口流量进行控制,结合应用的 Load、CPU 使用率、总体平均 RT、入口 QPS 和并发线程数等几个维度的监控指标,通过自适应的流控策略,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。

Sentinel系统自适应限流参考了TCP BBR实现,根据系统能处理的请求和允许进来的请求来做一个平衡,而不是通过一个系统load来做限流,它最终的目的是在系统不被拖垮的情况下,提高系统吞吐量,而不是load一定要低于某个阈值

系统规则

系统保护规则是从应用级别的入口流量进行控制,从单台机器的 load、CPU 使用率、平均 RT、入口 QPS 和并发线程数等几个维度监控应用指标,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。

系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量(EntryType.IN),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。

指标

说明

Load 自适应(仅对 Linux/Unix-like 机器生效)

系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的 maxQps * minRt 估算得出。设定参考值一般是 CPU cores * 2.5。

CPU usage(1.5.0+ 版本)

当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。

平均 RT

当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。

并发线程数

当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。

入口 QPS

当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。

原理

我们把系统处理请求的过程想象为一个水管,到来的请求是往这个水管灌水,当系统处理顺畅的时候,请求不需要排队,直接从水管中穿过,这个请求的RT是最短的;反之,当请求堆积的时候,那么处理请求的时间则会变为:排队时间 + 最短处理时间。

  • 如果我们能够保证水管里的水量,能够让水顺畅的流动,则不会增加排队的请求;也就是说,这个时候的系统负载不会进一步恶化。

我们用 T 来表示(水管内部的水量),用RT来表示请求的处理时间,用P来表示进来的请求数,那么一个请求从进入水管道到从水管出来,这个水管会存在 P * RT 个请求。换一句话来说,当 T ≈ QPS * Avg(RT) 的时候,我们可以认为系统的处理能力和允许进入的请求个数达到了平衡,系统的负载不会进一步恶化。

接下来的问题是,水管的水位是可以达到了一个平衡点,但是这个平衡点只能保证水管的水位不再继续增高,但是还面临一个问题,就是在达到平衡点之前,这个水管里已经堆积了多少水。如果之前水管的水已经在一个量级了,那么这个时候系统允许通过的水量可能只能缓慢通过,RT会大,之前堆积在水管里的水会滞留;反之,如果之前的水管水位偏低,那么又会浪费了系统的处理能力。

  • 当保持入口的流量是水管出来的流量的最大的值的时候,可以最大利用水管的处理能力。
源码分析

1.自适应限流使用例子

public class SystemGuardDemo {

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();

    private static volatile boolean stop = false;
    private static final int threadCount = 100;

    private static int seconds = 60 + 40;

    public static void main(String[] args) throws Exception {
        
        tick();
        //初始化规则参数
        initSystemRule();
        //开启线程执行流量调用
        for (int i = 0; i < threadCount; i++) {
            Thread entryThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        Entry entry = null;
                        try {
                            entry = SphU.entry("methodA", EntryType.IN);
                            pass.incrementAndGet();
                            try {
                                TimeUnit.MILLISECONDS.sleep(20);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                        } catch (BlockException e1) {
                            block.incrementAndGet();
                            try {
                                TimeUnit.MILLISECONDS.sleep(20);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                        } catch (Exception e2) {
                            // biz exception
                        } finally {
                            total.incrementAndGet();
                            if (entry != null) {
                                entry.exit();
                            }
                        }
                    }
                }

            });
            entryThread.setName("working-thread");
            entryThread.start();
        }
    }

    private static void initSystemRule() {
        List rules = new ArrayList();
        SystemRule rule = new SystemRule();
        // max load is 3
        rule.setHighestSystemLoad(3.0);
        // max cpu usage is 60%
        rule.setHighestCpuUsage(0.6);
        // max avg rt of all request is 10 ms
        rule.setAvgRt(10);
        // max total qps is 20
        rule.setQps(20);
        // max parallel working thread is 10
        rule.setMaxThread(10);

        rules.add(rule);
        SystemRuleManager.loadRules(Collections.singletonList(rule));
    }

    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    static class TimerTask implements Runnable {
        @Override
        public void run() {
            System.out.println("begin to statistic!!!");
            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;
            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + ", " + TimeUtil.currentTimeMillis() + ", total:"
                    + oneSecondTotal + ", pass:"
                    + oneSecondPass + ", block:" + oneSecondBlock);
                if (seconds-- <= 0) {
                    stop = true;
                }
            }
            System.exit(0);
        }
    }
}

2.自适应限流入口

2.1 SystemSlot

@SpiOrder(-5000)
public class SystemSlot extends AbstractLinkedProcessorSlot {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        SystemRuleManager.checkSystem(resourceWrapper);//@1
        fireEntry(context, resourceWrapper, node, count, prioritized, args);//@2
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

}

代码@1:entry方法中调用SystemRuleManager.checkSystem方法,这里是自适应限流的关键点。

代码@2:在职责链上继续调用下一个slot节点。

2.1 SystemRuleManager

2.1.1 类图

sentinel自适应限流通过SystemRuleManager类来实现,它里面封装了BBR算法的实现,以及系统指标的采集,接下来我们看下它的类图以及核心属性。

我们来看下SystemRuleManager的一些属性:

//系统最大负载
private static volatile double highestSystemLoad = Double.MAX_VALUE;

 //CPU使用率,介于[0,1]之间
private static volatile double highestCpuUsage = Double.MAX_VALUE;
private static volatile double qps = Double.MAX_VALUE;
//最大延迟
private static volatile long maxRt = Long.MAX_VALUE;
//最大线程数
private static volatile long maxThread = Long.MAX_VALUE;
//采集系统cpu load、cpu使用率的实现。
private static SystemStatusListener statusListener = null;

2.1.2 核心方法源码分析

checkSystem

public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
    //@1
    if (resourceWrapper == null) {
        return;
    }
    //@2
    // Ensure the checking switch is on.
    if (!checkSystemStatus.get()) {
        return;
    }
    //@3
    // for inbound traffic only
    if (resourceWrapper.getEntryType() != EntryType.IN) {
        return;
    }
    //@4
    // total qps
    double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
    if (currentQps > qps) {
        throw new SystemBlockException(resourceWrapper.getName(), "qps");
    }
    //@5
    // total thread
    int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
    if (currentThread > maxThread) {
        throw new SystemBlockException(resourceWrapper.getName(), "thread");
    }
    //@6
    double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
    if (rt > maxRt) {
        throw new SystemBlockException(resourceWrapper.getName(), "rt");
    }
    //@7
    // load. BBR algorithm.
    if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
        if (!checkBbr(currentThread)) {
            throw new SystemBlockException(resourceWrapper.getName(), "load");
        }
    }
    //@8
    // cpu usage
    if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
        throw new SystemBlockException(resourceWrapper.getName(), "cpu");
    }
}

//bbr算法
private static boolean checkBbr(int currentThread) {
    if (currentThread > 1 &&
        currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
        return false;
    }
    return true;
}

代码@1:检查资源是否为空,如果为空直接返回。

代码@2:判断系统自适应限流是否开启,未开启直接返回。

代码@3:判断资源的流量是否为入口流量,Sentinel系统自适应限流只对入口流量生效。

代码@4:从Constants.ENTRY_NODE获取当前qps,如果当前qps大于SystemRule配置的阈值,直接抛SystemBlockException异常

代码@5:从Constants.ENTRY_NODE获取当前thread,如果当前thread大于SystemRule配置的阈值,直接抛SystemBlockException 异常

代码@6:从Constants.ENTRY_NODE获取当前avgRT,如果当前avgRT大于SystemRule配置的阈值,直接抛SystemBlockException异常

代码@7:进行bbr算法校验:

  1. 校验系统负载开关是否打开,当前系统load是否大于配置的系统load,如果都满足则继续校验
  2. 调用checkBbr方法,之前我们有说过系统通过流量:T ≈ QPS * Avg(RT)时我们可以认为系统的处理能力和允许进入的请求个数达到了平衡,所以checkBbr方法计算的公式以秒为单位:T=QPS*RT/1000。如果当前线程数大于T,则进行拦截。

代码@8:判断当前CPU使用率是否大于SystemRule配置的阈值,如果是则抛出SystemBlockException异常

Constants.ENTRY_NODE

自适应限流使用的是全局的ClusterNode节点,这就是说自适应限流的维度是整个系统。

public final static ClusterNode ENTRY_NODE = new ClusterNode(TOTAL_IN_RESOURCE_NAME, ResourceTypeConstants.COMMON);

系统指标采集运行

@SuppressWarnings("PMD.ThreadPoolCreationRule")
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
    new NamedThreadFactory("sentinel-system-status-record-task", true));

static {
    checkSystemStatus.set(false);
    statusListener = new SystemStatusListener();
    scheduler.scheduleAtFixedRate(statusListener, 0, 1, TimeUnit.SECONDS);
    currentProperty.addListener(listener);
}

SystemRuleManager类中定义了ScheduledExecutorService线程池,在静态块里面触发SystemStatusListener类的运行,运行时间是1秒钟一次,这表示Sentinel的自适应保护信息采集为1秒钟采集系统load、cpu信息。

2.2.1系统指标采集源码分析

SystemStatusListener 类图:

该类实现了runnable接口,他通过一个线程每隔一秒执行一次load、cpu usage信息的采集。

2.2.2 源码分析

public class SystemStatusListener implements Runnable {

    volatile double currentLoad = -1;
    volatile double currentCpuUsage = -1;

    volatile String reason = StringUtil.EMPTY;

    volatile long processCpuTime = 0;
    volatile long processUpTime = 0;

    public double getSystemAverageLoad() {
        return currentLoad;
    }

    public double getCpuUsage() {
        return currentCpuUsage;
    }

    @Override
    public void run() {
        try {
            //@1
            OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
            currentLoad = osBean.getSystemLoadAverage();//@2

            
            double systemCpuUsage = osBean.getSystemCpuLoad();//@3

            // calculate process cpu usage to support application running in container environment
            RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class);
            long newProcessCpuTime = osBean.getProcessCpuTime();//@4
            long newProcessUpTime = runtimeBean.getUptime();//@5
            int cpuCores = osBean.getAvailableProcessors();//@6
            long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
                    .toMillis(newProcessCpuTime - processCpuTime);
            long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
            double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;//@7
            processCpuTime = newProcessCpuTime;
            processUpTime = newProcessUpTime;

            currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage);

            if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {
                writeSystemStatusLog();
            }
        } catch (Throwable e) {
            RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
        }
    }
}

代码@1:Sentinel通过jdk:ManagementFactory类获取系统load、cpu等信息。

代码@2:获取当前系统load。

代码@3:获取当前系统cpu usage。

代码@4:获取系统cpu运行时间。

代码@5:获取当前jvm cpu运行时间。

代码@6:获取系统cpu核心数。

代码@7:计算CPU使用率。

总结

经过以上源码分析,我们可以得出几点结论:

    • Sentinel自适应限流原理采用了BBR算法
    • Sentinel自适应限流维度是整个系统,当系统负载过大时会触发限流。
    • Sentinel自适应限流信息采集指标为1秒钟一次。
    • Sentinel采集的指标是操作系统级别的,所以要使用自适应限流的话,建议应用服务器不要部署其他负载很高的服务。

参考文章:

https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/912726.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号