Go官方限流器介绍使用

限流

限制某服务在一段时间可以被调用的频率。
一般用于防止客户端请求过多,导致服务端拥堵的手段,也是防止ddos的一种手段。
常用限流算法有三种:

  • 滑动限流
  • 漏斗限流
  • 令牌桶限流
    time/rate是官方实现的令牌桶算法限流器

    Go官方限流time/rate

    安装

    go get golang.org/x/time/rate

    构造限流器相关函数

    type Limit float64
    
    func Every(interval time.Duration) Limit {  
      if interval <= 0 {  
         return Inf  
      }  
      return 1 / Limit(interval.Seconds())  
    }
    
    func NewLimiter(r Limit, b int) *Limiter {  
      return &Limiter{  
         limit: r,  
         burst: b,  
      }  
    }

    NewLimiter有两个参数:
    第一个是填充令牌的速度,默认已秒为单位。如r=10,则说明这个限流器会每秒填充10个令牌。
    第二个是为桶的容量,即同一个时刻,能从限流器中拿到多少个令牌。

例1:

limiter := rate.NewLimiter(10, 1)

表示每秒往桶里放10个token,桶的容量为1。

例2:

limit := Every(100 * time.Millisecond);
limiter := NewLimiter(limit, 1);

使用了Every自定义了速率,为100ms放入1个令牌

主要方法

WaitN

// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is  
// canceled, or the expected wait time exceeds the Context's Deadline.  
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {  
    ...
}

当桶内拿走n个令牌,当令牌不足n时,方法堵塞。
当n大于设置的桶容量、上下文超时和上下文超时时,返回错误

Wait

// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {  
    return lim.WaitN(ctx, 1)  
}

WaitN方法n=1的实现

AllowN

// AllowN reports whether n events may happen at time t.
// Use this method if you intend to drop / skip events that exceed the rate limit.  
// Otherwise use Reserve or Wait.  
func (lim *Limiter) AllowN(t time.Time, n int) bool {  
    ...
}

在t这个时刻,从桶中拿走n个令牌,如果能够成功拿到,返回true,否则返回false,方法不堵塞。

Allow

// Allow reports whether an event may happen now.
func (lim *Limiter) Allow() bool {  
    return lim.AllowN(time.Now(), 1)  
}

AllowN方法t=time.Now()n=1的实现

ReserveN / Reserve

// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.  
func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {  
    r := lim.reserveN(t, n, InfDuration)
    return &r
}

// Reserve is shorthand for ReserveN(time.Now(), 1).  
func (lim *Limiter) Reserve() *Reservation {  
    return lim.ReserveN(time.Now(), 1)  
}

ReserveN与Reserve都是返回Reservation结构体,结构体有如下方法:

func (r *Reservation) OK() bool // 判断是否获取到n个令牌
func (r *Reservation) DelayFrom(now time.Time) time.Duration // 从now开始需要休眠多久
func (r *Reservation) Delay() time.Duration // DelayFrom的简写,now=time.Now()
func (r *Reservation) CancelAt(now time.Time) // 取消,将获取的Token重新放入桶中 
func (r *Reservation) Cancel() // CancelAt的简写,now=time.Now()

WaitN与AllowN可以说都是对ReserveN的封装实现

使用场景

  • WaitN / Wait:希望事件不丢失,超出速率的方法堵塞
  • AllowN / Allow:超出速率的事件直接丢弃,方法不堵塞
  • ReserveN / Reserve:自定义限流方案

动态修改速率与桶容量

func (lim *Limiter) SetLimit(newLimit Limit) //改变放入Token的速率 
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) 
func (lim *Limiter) SetBurst(newBurst int) // 改变Token桶大小 
func (lim *Limiter) SetBurstAt(now time.Time, newBurst int)

使用示例

Wait
limiter := rate.NewLimiter(1, 1)  
wg := sync.WaitGroup{}
m := sync.Mutex{}
start = time.Now()  
sum = 0
for i := 0; i < 10; i++ {  
    wg.Add(1)  
    go func() {  
       defer wg.Done()  
       limiter.Wait(context.Background())  
       m.Lock()  
       defer m.Unlock()  
       sum += 1  
    }()  
}  
wg.Wait()
fmt.Println("Wait 限流", sum, time.Since(start))

输出:

Wait 限流 10 11s
Allow
limiter := rate.NewLimiter(1, 1)  
wg := sync.WaitGroup{}
m := sync.Mutex{}
start = time.Now()  
sum = 0
for i := 0; i < 10; i++ {  
    wg.Add(1)  
    go func() {  
        defer wg.Done()
        ok := limiter.Allow()  
        if ok {  
            m.Lock()  
            defer m.Unlock()  
            sum += 1  
        }
    }()  
}  
wg.Wait()
fmt.Println("Allow 限流", sum, time.Since(start))

输出:

Allow 限流 1 2s
Reserve
limiter := rate.NewLimiter(1, 1)  
wg := sync.WaitGroup{}
m := sync.Mutex{}
start = time.Now()  
sum = 0
for i := 0; i < 10; i++ {  
    wg.Add(1)  
    go func() {  
        defer wg.Done()  
        r := limiter.Reserve()  
        if !r.OK() { // 是否获取到token  
            return  
        }  
        time.Sleep(r.Delay())  
        m.Lock()  
        defer m.Unlock()  
        sum += 1
    }()  
}  
wg.Wait()
fmt.Println("Reserve 限流", sum, time.Since(start))

输出:

Reserve 限流 10 11s