什么是限流?1、在开发高并发系统时,我们可能会遇到接口访问频次过高,为了保证系统的高可用和稳定性,这时候就需要做流量限制。
2、有一些算子模型计算时间会耗时很长,并且耗资源,如果此时流量过高时处理不过来的。
3、可以用 Nginx 来控制请求,也可以用一些流行的类库实现。限流是高并发系统的一大杀器。
4、现在一般网关也会做限流。
限流是通过对并发访问请求进行限制,或者对一个时间窗口内的请求进行限速来保护服务,一旦达到限制速率需要采取措施。
- 直接拒绝服务:定向到错误页或者告知资源没有了
- 等待:可以放入队列中或者其他操作
- 降级处理:返回兜底数据或者默认数据
一般来说各种措施都可以用返回状态码进行表示,请求方可根据返回的状态码进行自定义操作
例如:
- 直接拒绝:5001 (请求方可进行重试操作retry)
- 等待:5002 (请求方可以不用做任何操作,等待服务器返回信息即可)
- 降级处理:5003 (服务器给返回的兜底数据,具体处理措施可双方进行商议)
限流最主要的目的就是为了保证自己的服务不被打挂,其次也可以作为服务可用率来衡量,比如可以和业务商量,qps为10,即1秒来10个请求,保证这10个请求都能够正常处理,或者允许一点错误,可用率为100%或者99.999%等等。
限流的常用算法?- 计数器算法
- 滑动窗口算法
- 漏桶算法
- 令牌桶算法
1、计数器算法
所谓计数器算法就是在一定的时间间隔内,对请求进行计数,当请求数量达到阙值后判断是否需要限流,当达到时间的临界点后,对计数器进行清零,并更新时间
- 设置一个变量 count,当过来一个请求count++,同时记录请求时间
- 当下一个请求来的时候判断 count 的值是否超过设定的阙值,如果没有超过阙值,不进行限流,如果超过阙值,需要判断当前请求的时间和第一次请求时间差值是否在计数周期内
- 如果两时间差在计数周期之内,此时就需要限流,拒绝请求
- 如果两时间差大于等于计数周期,此时就需要更新时间以及清空count的值
代码实现如下
package main
import (
"log"
"sync"
"time"
)
type Counter struct {
rate int //计数周期内最多允许的请求数
begin time.Time //计数开始时间
cycle time.Duration //计数周期
count int //计数周期内总请求数
lock sync.Mutex
}
// 判断是否被限流
func (l *Counter) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()
if l.count == l.rate-1 {
now := time.Now()
if now.Sub(l.begin) >= l.cycle {
//速度允许范围内, 重置计数器
l.Reset(now)
return true
} else {
return false
}
} else {
//没有达到速率限制,计数加1
l.count++
return true
}
}
// 初始化
func (l *Counter) Set(r int, cycle time.Duration) {
l.rate = r
l.begin = time.Now()
l.cycle = cycle
l.count = 0
}
//更新时间和清空count
func (l *Counter) Reset(t time.Time) {
l.begin = t
l.count = 0
}
func main() {
var wg sync.WaitGroup
var lr Counter
lr.Set(3, time.Second) // 1s内最多请求3次
for i := 0; i < 10; i++ {
wg.Add(1)
log.Println("创建请求:", i)
go func(i int) {
if lr.Allow() {
log.Println("响应请求:", i)
}
wg.Done()
}(i)
time.Sleep(200 * time.Millisecond)
}
wg.Wait()
}
响应结果如下
2021/02/01 21:16:12 创建请求: 0 2021/02/01 21:16:12 响应请求: 0 2021/02/01 21:16:12 创建请求: 1 2021/02/01 21:16:12 响应请求: 1 2021/02/01 21:16:12 创建请求: 2 2021/02/01 21:16:13 创建请求: 3 2021/02/01 21:16:13 创建请求: 4 2021/02/01 21:16:13 创建请求: 5 2021/02/01 21:16:13 响应请求: 5 2021/02/01 21:16:13 创建请求: 6 2021/02/01 21:16:13 响应请求: 6 2021/02/01 21:16:13 创建请求: 7 2021/02/01 21:16:13 响应请求: 7 2021/02/01 21:16:14 创建请求: 8 2021/02/01 21:16:14 创建请求: 9
通过响应结果可以看出,2、3、4、8、9的请求被丢弃,即被限流了
但是计数器算法存在一个问题?
一个API接口qps为100,此时在无限接近1s的时候请求了100次,那么到1s后count就清0了,但是此时请求还在继续,count会进行++,然后在上个1s完后立马又有100个请求到来,此时这个时间段内就是无限接近200个请求,会大于限流的两倍,这个是符合我们的设计逻辑的,这也是计数器方法的设计缺陷,系统可能会承受恶意用户的大量请求,甚至击穿系统。所以它也无法处理这种突发流量的情况,没有很好的处理单位时间的边界
2、滑动窗口算法
滑动窗口是针对计数器存在的临界点缺陷,所谓 滑动窗口(Sliding window) 是一种流量控制技术,这个词出现在 TCP 协议中。滑动窗口把固定时间片进行划分,并且随着时间的流逝,进行移动,固定数量的可以移动的格子,进行计数并判断阀值。
代码如下
package utils
import "time"
var LimitQueue map[string][]int64
var ok bool
//单机时间滑动窗口限流法
func LimitFreqSingle(queueName string, count uint, timeWindow int64) bool {
currTime := time.Now().Unix()
if LimitQueue == nil {
LimitQueue = make(map[string][]int64)
}
if _, ok = LimitQueue[queueName]; !ok {
LimitQueue[queueName] = make([]int64, 0)
}
//队列未满
if uint(len(LimitQueue[queueName])) < count {
LimitQueue[queueName] = append(LimitQueue[queueName], currTime)
return true
}
//队列满了,取出最早访问的时间
earlyTime := LimitQueue[queueName][0]
//说明最早期的时间还在时间窗口内,还没过期,所以不允许通过
if currTime-earlyTime <= timeWindow {
return false
} else {
//说明最早期的访问应该过期了,去掉最早期的
LimitQueue[queueName] = LimitQueue[queueName][1:]
LimitQueue[queueName] = append(LimitQueue[queueName], currTime)
}
return true
}
还是会有时间片的概念,无法根本即决临界点问题,具体实现算法可参考
相关算法实现 github.com/RussellLuo/slidingwindow
3、漏桶算法
漏桶算法(Leaky Bucket),原理就是一个固定容量的漏桶,按照固定速率流出水滴,即处理请求的速率是固定的,即使有突发流量也会因为漏桶容量不足而被丢弃。
代码实现如下
type LeakyBucket struct {
rate float64 //固定每秒出水速率
capacity float64 //桶的容量
water float64 //桶中当前水量
lastLeakMs int64 //桶上次漏水时间戳 ms
lock sync.Mutex
}
func (l *LeakyBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()
now := time.Now().UnixNano() / 1e6
eclipse := float64((now - l.lastLeakMs)) * l.rate / 1000 //先执行漏水
l.water = l.water - eclipse //计算剩余水量
l.water = math.Max(0, l.water) //桶干了
l.lastLeakMs = now
if (l.water + 1) < l.capacity {
// 尝试加水,并且水还未满
l.water++
return true
} else {
// 水满,拒绝加水
return false
}
}
func (l *LeakyBucket) Set(r, c float64) {
l.rate = r
l.capacity = c
l.water = 0
l.lastLeakMs = time.Now().UnixNano() / 1e6
}
- 桶的的容量是固定的,并且流水速度也是固定的(处理请求的速率)
- 如果桶此时是空的,就不需要处理请求
- 可以以任意流水速度流进桶中(流量可以随便来,突发也可以)
- 桶满以后就不能装水了,会溢出(流量达到容量以后会被拒绝)
缺点:不能处理突发流量,但可以解决突发流量带来的问题
4、令牌桶算法
令牌桶算法(Token Bucket)是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。
令牌桶算法:有一个固定的桶,桶里面存放令牌,刚开始桶是空的,系统按照固定的时间会往桶里面放入固定的令牌,来一个请求会去桶里取令牌,如果桶里没有令牌了该请求就会被拒绝
type TokenBucket struct {
rate int64 //固定的token放入速率, r/s
capacity int64 //桶的容量
tokens int64 //桶中当前token数量
lastTokenSec int64 //桶上次放token的时间戳 s
lock sync.Mutex
}
func (l *TokenBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()
now := time.Now().Unix()
l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // 先添加令牌
if l.tokens > l.capacity {
l.tokens = l.capacity
}
l.lastTokenSec = now
if l.tokens > 0 {
// 还有令牌,领取令牌
l.tokens--
return true
} else {
// 没有令牌,则拒绝
return false
}
}
func (l *TokenBucket) Set(r, c int64) {
l.rate = r
l.capacity = c
l.tokens = 0
l.lastTokenSec = time.Now().Unix()
}
- 令牌桶按照一定的时间放入一定的令牌
- 桶中容量是有限的,如果桶满了,新加的令牌会被丢弃
- 桶中没有令牌了,请求会被限流丢弃
令牌桶算法其实限制的是平均流入速率,所以它可以允许一定的突发流量,不会存在其他问题



