限流
限制某服务在一段时间可以被调用的频率。
一般用于防止客户端请求过多,导致服务端拥堵的手段,也是防止ddos的一种手段。
常用限流算法有三种:
- 滑动限流
- 漏斗限流
令牌桶限流
time/rate
是官方实现的令牌桶算法限流器Go官方限流time/rate
安装
go get golang.org/x/time/rate
构造限流器相关函数
type Limit float64 func Every(interval time.Duration) Limit { if interval <= 0 { return Inf } return 1 / Limit(interval.Seconds()) } func NewLimiter(r Limit, b int) *Limiter { return &Limiter{ limit: r, burst: b, } }
NewLimiter
有两个参数:
第一个是填充令牌的速度,默认已秒为单位。如r=10
,则说明这个限流器会每秒填充10个令牌。
第二个是为桶的容量,即同一个时刻,能从限流器中拿到多少个令牌。
例1:
limiter := rate.NewLimiter(10, 1)
表示每秒往桶里放10个token,桶的容量为1。
例2:
limit := Every(100 * time.Millisecond);
limiter := NewLimiter(limit, 1);
使用了Every
自定义了速率,为100ms放入1个令牌
主要方法
WaitN
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
...
}
当桶内拿走n个令牌,当令牌不足n时,方法堵塞。
当n大于设置的桶容量、上下文超时和上下文超时时,返回错误
Wait
// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.WaitN(ctx, 1)
}
为WaitN
方法n=1
的实现
AllowN
// AllowN reports whether n events may happen at time t.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(t time.Time, n int) bool {
...
}
在t这个时刻,从桶中拿走n个令牌,如果能够成功拿到,返回true
,否则返回false
,方法不堵塞。
Allow
// Allow reports whether an event may happen now.
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
为AllowN
方法t=time.Now()
,n=1
的实现
ReserveN / Reserve
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
r := lim.reserveN(t, n, InfDuration)
return &r
}
// Reserve is shorthand for ReserveN(time.Now(), 1).
func (lim *Limiter) Reserve() *Reservation {
return lim.ReserveN(time.Now(), 1)
}
ReserveN与Reserve都是返回Reservation结构体,结构体有如下方法:
func (r *Reservation) OK() bool // 判断是否获取到n个令牌
func (r *Reservation) DelayFrom(now time.Time) time.Duration // 从now开始需要休眠多久
func (r *Reservation) Delay() time.Duration // DelayFrom的简写,now=time.Now()
func (r *Reservation) CancelAt(now time.Time) // 取消,将获取的Token重新放入桶中
func (r *Reservation) Cancel() // CancelAt的简写,now=time.Now()
WaitN与AllowN可以说都是对ReserveN的封装实现
使用场景
- WaitN / Wait:希望事件不丢失,超出速率的方法堵塞
- AllowN / Allow:超出速率的事件直接丢弃,方法不堵塞
- ReserveN / Reserve:自定义限流方案
动态修改速率与桶容量
func (lim *Limiter) SetLimit(newLimit Limit) //改变放入Token的速率
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
func (lim *Limiter) SetBurst(newBurst int) // 改变Token桶大小
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)
使用示例
Wait
limiter := rate.NewLimiter(1, 1)
wg := sync.WaitGroup{}
m := sync.Mutex{}
start = time.Now()
sum = 0
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
limiter.Wait(context.Background())
m.Lock()
defer m.Unlock()
sum += 1
}()
}
wg.Wait()
fmt.Println("Wait 限流", sum, time.Since(start))
输出:
Wait 限流 10 11s
Allow
limiter := rate.NewLimiter(1, 1)
wg := sync.WaitGroup{}
m := sync.Mutex{}
start = time.Now()
sum = 0
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ok := limiter.Allow()
if ok {
m.Lock()
defer m.Unlock()
sum += 1
}
}()
}
wg.Wait()
fmt.Println("Allow 限流", sum, time.Since(start))
输出:
Allow 限流 1 2s
Reserve
limiter := rate.NewLimiter(1, 1)
wg := sync.WaitGroup{}
m := sync.Mutex{}
start = time.Now()
sum = 0
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r := limiter.Reserve()
if !r.OK() { // 是否获取到token
return
}
time.Sleep(r.Delay())
m.Lock()
defer m.Unlock()
sum += 1
}()
}
wg.Wait()
fmt.Println("Reserve 限流", sum, time.Since(start))
输出:
Reserve 限流 10 11s