栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

数据结构与算法的深入应用-限流算法

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

数据结构与算法的深入应用-限流算法

限流是对系统的一种保护措施。即限制请求的频率(每秒处理多少个请求)。一般来说当请求频率大于系统的瓶颈就需要丢弃超过系统瓶颈的请求,从而来保证系统的可用性。系统一旦接受请求就可以保证得到正常的服务。限流算法常见有以下几种:

1,计数器

java实现

package com.xieq.algorithm.limit;

import java.util.Date;
import java.util.concurrent.*;


public class Counter {
    public static void main(String[] args) {

        Semaphore semaphore = new Semaphore(3);
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(() -> {
                semaphore.release(3);
        }, 3000, 3000, TimeUnit.MILLISECONDS);

        while (true){
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new Date()+"---ok");
        }

    }
}

运行输出如下:

可见请求是以三个未一组进行服务的。这里通过信号量(Semaphore)简单的实现计数器限流算法。
优缺点:
实现简单;
控制力度过于简略,加入1秒内限制3次,那么3次在前100毫秒,后面的900毫秒就只能处于阻塞状态。
应用:
计数器限流算法应用的场景比较少,因为他的处理逻辑不够灵活。如果网站计数器算法限流,那么攻击者可以吃掉前100毫秒的浏览计数,正常的请求则得不到执行。

2,漏斗桶

1)漏斗桶算法将请求缓存在漏斗中,系统匀速的处理请求,超过缓存的那一部分请求将被丢弃。

2)实现

package com.xieq.algorithm.limit;

import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.Executors.*;


public class Barrel {

    private static final linkedBlockingQueue QUEUE = new linkedBlockingQueue<>(3);

    public static void main(String[] args) {
        ScheduledExecutorService service = newScheduledThreadPool(1);
        //模拟系统的性能瓶颈,每2秒处理一个请求,
        service.scheduleAtFixedRate(() -> {
            Integer i = QUEUE.poll();
            System.out.println("接受请求" + i + "并处理。。。");
        }, 2000, 2000, TimeUnit.MILLISECONDS);

        //模拟请求,每一秒发出一个请求
        int i = 0;
        while (true) {
            i++;
            try {
                //如果一秒放不进队列(队列已满),则会丢弃当前请求
                boolean put = QUEUE.offer(i, 1000, TimeUnit.MILLISECONDS);
                if (put) {
                    System.out.println("put:" + i);
                } else {
                    System.out.println("drop:" + i);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

运行输出如下:

put:1
put:2
put:3
drop:4
drop:5
put:6
接受请求1并处理。。。
drop:7
接受请求2并处理。。。
put:8
drop:9
接受请求3并处理。。。
执行任务匀速的1秒处理一个请求;超过的请求被丢弃。

4)优缺点:
有效的挡住了外部请求,保护的内部的服务不会过载;
内部服务匀速处理,不能效应流量洪峰,无法做到弹性处理系统请求;
任务超时溢出时被遗弃,现实总可能需要酌情处理。
5)应用:
nginx中的限流是漏斗桶算法的经典应用。

http {
#$binary_remote_addr 表示通过remote_addr这个标识来做key,也就是限制同一客户端ip地址。
#zone=one:10m 表示生成一个大小为10M,名字为one的内存区域,用来存储访问的频次信息。
#rate=1r/s 表示允许相同标识的客户端每秒1次访问
limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;
server {
location /limited/ {
#zone=one 与上面limit_req_zone 里的name对应。
#burst=5 缓冲区,超过了访问频次限制的请求可以先放到这个缓冲区内,类似代码中的队列长度。
#nodelay 如果设置,超过访问频次而且缓冲区也满了的时候就会直接返回503,如果没有设置,则所有请求
会等待排队,类似代码中的put还是offer。
limit_req zone=one burst=5 nodelay;
}
}

3,令牌桶

1)概述
令牌桶在漏斗桶的基础上解决了漏斗桶不能处理流量洪峰的问题。令牌桶匀速的颁布令牌并将颁布的令牌缓存起来,请求到达时先获取令牌再执行业务逻辑。

2)实现

package com.xieq.algorithm.limit;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;


public class Token {
    public static void main(String[] args) throws Exception {
        Semaphore semaphore = new Semaphore(3);
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        //每一秒颁发一个令牌
        service.scheduleAtFixedRate(() -> {
            if (semaphore.availablePermits() < 3) {
                semaphore.release();
            }

        }, 1000, 1000, TimeUnit.MILLISECONDS);

        //等待5令牌颁发完成
        TimeUnit.MILLISECONDS.sleep(5000);

        for (int i = 0; i < 5; i++) {
            semaphore.acquire();
            System.out.println(new Date() + "洪峰:" + i);
        }

        for (int i = 0; i < 3; i++) {
            TimeUnit.MILLISECONDS.sleep(2000);
            semaphore.acquire();
            System.out.println(new Date() + "正常:" + i);
        }

    }
}

输出如下:

Mon Jan 17 10:40:51 CST 2022洪峰:0
Mon Jan 17 10:40:51 CST 2022洪峰:1
Mon Jan 17 10:40:51 CST 2022洪峰:2
Mon Jan 17 10:40:51 CST 2022洪峰:3
Mon Jan 17 10:40:52 CST 2022洪峰:4
Mon Jan 17 10:40:54 CST 2022正常:0
Mon Jan 17 10:40:56 CST 2022正常:1
Mon Jan 17 10:40:58 CST 2022正常:2

4,滑动窗口

1)概述
滑动窗口弥补了计数器的不足,计数器限流只是粗暴的限制了一分钟了的请求数而滑动窗口限流可以将一分钟细分为若干小的时间片,不但要求整个时间窗口不能超过请求的上限也要求每个时间片不能超过请求的上限,从而不会出现像计数器限流那样前面一秒钟把计数流量打满,后面来的请求无法服务的情况。

2)代码实现

package com.xieq.algorithm.limit;

import java.util.linkedList;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


public class Window {
    
    private static final Integer TOTAL_MAX = 5;
    
    private static final Integer SILENCE_MAX = 5;

    
    private static final Integer SILENCE = 3;

    
    private static final linkedList QUEUE = new linkedList<>();
    
    private static final Map MAP = new TreeMap<>();

    
    ScheduledExecutorService service = Executors.newScheduledThreadPool(1);


    public Window() {
        Long key = getKey();
        for (int i = 0; i < SILENCE; i++) {
            QUEUE.addFirst(key - i);
            MAP.put(key-i, new AtomicInteger(0));
        }

        //每秒中滑动窗口向前移动一格,添加新的时间片,移除老的时间片,使整个窗口只持有3个时间片
        service.scheduleAtFixedRate(() -> {
            //添加新的时间片
            Long keyNew = getKey();
            QUEUE.addLast(keyNew);
            MAP.put(keyNew, new AtomicInteger(0));

            //移除老的时间片
            MAP.remove(QUEUE.getFirst());
            QUEUE.removeFirst();

            System.out.println("step:" + keyNew + ":" + MAP);

        }, 1000, 1000, TimeUnit.MILLISECONDS);
    }

    public void req() throws InterruptedException {
        Long key = getKey();
        //如果时间窗口未达到当前时间片,稍微等待一下,使得放置心跳滞后于当前请求
        while (QUEUE.getLast() < key) {
            Thread.sleep(200);
        }
        //检查窗口的总任务数与当前时间片的任务数,如果未达到上限,就可以开始执行任务,计数器加1
        //检查不通过这里时拒绝执行,实际业务酌情处理
        if (checkAll() && checkSilence()) {
            MAP.get(key).incrementAndGet();
            System.out.println("执行任务:" + key);
        } else {
            System.out.println("拒绝任务:" + key);
        }
    }

    public Boolean checkAll() {
        return MAP.values().stream().mapToInt(AtomicInteger::get).sum() < TOTAL_MAX;
    }

    public Boolean checkSilence() {
        Long key = getKey();
        return MAP.get(key).get() < SILENCE_MAX;
    }

    public Long getKey() {
        return System.currentTimeMillis() / 1000;
    }

    public static void main(String[] args) throws InterruptedException {
        Window window = new Window();

        //模拟10个离散请求,会因总数超过限制而限流
        for (int i = 0; i < 10; i++) {
            TimeUnit.MILLISECONDS.sleep(200);
            window.req();
        }

        //等待下一个滑动窗口,让各个片的计数器都归零
//        TimeUnit.SECONDS.sleep(3L);
//        System.out.println("---------------------");
//        for (int i = 0; i < 10; i++) {
//            window.req();
//        }

    }
}

运行输出如下:

可见整个窗口任务达到上限后,后续的请求被拒绝执行了。
4)应用
滑动窗口算法,在tcp协议发包过程中被使用。在web场景中,可以将流量控制做更细化处理,解决计数器限流控制力度太粗暴的问题。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/708264.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号