柚子快報激活碼778899分享:gin中使用限流中間件
柚子快報激活碼778899分享:gin中使用限流中間件
限流又稱為流量控制(流控),通常是指限制到達系統(tǒng)的并發(fā)請求數(shù),本文列舉了常見的限流策略,并以gin框架為例演示了如何為項目添加限流組件。
限流
限流又稱為流量控制(流控),通常是指限制到達系統(tǒng)的并發(fā)請求數(shù)。
我們生活中也會經(jīng)常遇到限流的場景,比如:某景區(qū)限制每日進入景區(qū)的游客數(shù)量為8萬人;沙河地鐵站早高峰通過站外排隊逐一放行的方式限制同一時間進入車站的旅客數(shù)量等。
限流雖然會影響部分用戶的使用體驗,但是卻能在一定程度上報障系統(tǒng)的穩(wěn)定性,不至于崩潰(大家都沒了用戶體驗)。
而互聯(lián)網(wǎng)上類似需要限流的業(yè)務(wù)場景也有很多,比如電商系統(tǒng)的秒殺、微博上突發(fā)熱點新聞、雙十一購物節(jié)、12306搶票等等。這些場景下的用戶請求量通常會激增,遠遠超過平時正常的請求量,此時如果不加任何限制很容易就會將后端服務(wù)打垮,影響服務(wù)的穩(wěn)定性。
此外,一些廠商公開的API服務(wù)通常也會限制用戶的請求次數(shù),比如百度地圖開放平臺等會根據(jù)用戶的付費情況來限制用戶的請求數(shù)等。
常用的限流策略
漏桶
漏桶法限流很好理解,假設(shè)我們有一個水桶按固定的速率向下方滴落一滴水,無論有多少請求,請求的速率有多大,都按照固定的速率流出,對應(yīng)到系統(tǒng)中就是按照固定的速率處理請求。
漏桶法的關(guān)鍵點在于漏桶始終按照固定的速率運行,但是它并不能很好的處理有大量突發(fā)請求的場景,畢竟在某些場景下我們可能需要提高系統(tǒng)的處理效率,而不是一味的按照固定速率處理請求。
關(guān)于漏桶的實現(xiàn),uber團隊有一個開源的github.com/uber-go/ratelimit庫。 這個庫的使用方法比較簡單,Take()?方法會返回漏桶下一次滴水的時間。
import (
"fmt"
"time"
"go.uber.org/ratelimit"
)
func main() {
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
// Output:
// 0 0
// 1 10ms
// 2 10ms
// 3 10ms
// 4 10ms
// 5 10ms
// 6 10ms
// 7 10ms
// 8 10ms
// 9 10ms
}
它的源碼實現(xiàn)也比較簡單,這里大致說一下關(guān)鍵的地方,有興趣的同學(xué)可以自己去看一下完整的源碼。
限制器是一個接口類型,其要求實現(xiàn)一個Take()方法:
type Limiter interface {
// Take方法應(yīng)該阻塞已確保滿足 RPS
Take() time.Time
}
實現(xiàn)限制器接口的結(jié)構(gòu)體定義如下,這里可以重點留意下maxSlack字段,它在后面的Take()方法中的處理。
type limiter struct {
sync.Mutex // 鎖
last time.Time // 上一次的時刻
sleepFor time.Duration // 需要等待的時間
perRequest time.Duration // 每次的時間間隔
maxSlack time.Duration // 最大的富余量
clock Clock // 時鐘
}
limiter結(jié)構(gòu)體實現(xiàn)Limiter接口的Take()方法內(nèi)容如下:
// Take 會阻塞確保兩次請求之間的時間走完
// Take 調(diào)用平均數(shù)為 time.Second/rate.
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// 如果是第一次請求就直接放行
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor 根據(jù) perRequest 和上一次請求的時刻計算應(yīng)該sleep的時間
// 由于每次請求間隔的時間可能會超過perRequest, 所以這個數(shù)字可能為負數(shù),并在多個請求之間累加
t.sleepFor += t.perRequest - now.Sub(t.last)
// 我們不應(yīng)該讓sleepFor負的太多,因為這意味著一個服務(wù)在短時間內(nèi)慢了很多隨后會得到更高的RPS。
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
// 如果 sleepFor 是正值那么就 sleep
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}
上面的代碼根據(jù)記錄每次請求的間隔時間和上一次請求的時刻來計算當(dāng)次請求需要阻塞的時間——sleepFor,這里需要留意的是sleepFor的值可能為負,在經(jīng)過間隔時間長的兩次訪問之后會導(dǎo)致隨后大量的請求被放行,所以代碼中針對這個場景有專門的優(yōu)化處理。創(chuàng)建限制器的New()函數(shù)中會為maxSlack設(shè)置初始值,也可以通過WithoutSlack這個Option取消這個默認值。
func New(rate int, opts ...Option) Limiter {
l := &limiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate),
}
for _, opt := range opts {
opt(l)
}
if l.clock == nil {
l.clock = clock.New()
}
return l
}
令牌桶
令牌桶其實和漏桶的原理類似,令牌桶按固定的速率往桶里放入令牌,并且只要能從桶里取出令牌就能通過,令牌桶支持突發(fā)流量的快速處理。
對于從桶里取不到令牌的場景,我們可以選擇等待也可以直接拒絕并返回。
對于令牌桶的Go語言實現(xiàn),大家可以參照github.com/juju/ratelimit庫。這個庫支持多種令牌桶模式,并且使用起來也比較簡單。
創(chuàng)建令牌桶的方法:
// 創(chuàng)建指定填充速率和容量大小的令牌桶
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
// 創(chuàng)建指定填充速率、容量大小和每次填充的令牌數(shù)的令牌桶
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
// 創(chuàng)建填充速度為指定速率和容量大小的令牌桶
// NewBucketWithRate(0.1, 200) 表示每秒填充20個令牌
func NewBucketWithRate(rate float64, capacity int64) *Bucket
取出令牌的方法如下:
// 取token(非阻塞)
func (tb *Bucket) Take(count int64) time.Duration
func (tb *Bucket) TakeAvailable(count int64) int64
// 最多等maxWait時間取token
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
// 取token(阻塞)
func (tb *Bucket) Wait(count int64)
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
雖說是令牌桶,但是我們沒有必要真的去生成令牌放到桶里,我們只需要每次來取令牌的時候計算一下,當(dāng)前是否有足夠的令牌就可以了,具體的計算方式可以總結(jié)為下面的公式:
當(dāng)前令牌數(shù) = 上一次剩余的令牌數(shù) + (本次取令牌的時刻-上一次取令牌的時刻)/放置令牌的時間間隔 * 每次放置的令牌數(shù)
github.com/juju/ratelimit這個庫中關(guān)于令牌數(shù)計算的源代碼如下:
func (tb *Bucket) currentTick(now time.Time) int64 {
return int64(now.Sub(tb.startTime) / tb.fillInterval)
}
func (tb *Bucket) adjustavailableTokens(tick int64) {
if tb.availableTokens >= tb.capacity {
return
}
tb.availableTokens += (tick - tb.latestTick) * tb.quantum
if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity
}
tb.latestTick = tick
return
}
獲取令牌的TakeAvailable()函數(shù)關(guān)鍵部分的源代碼如下:
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
if count <= 0 {
return 0
}
tb.adjustavailableTokens(tb.currentTick(now))
if tb.availableTokens <= 0 {
return 0
}
if count > tb.availableTokens {
count = tb.availableTokens
}
tb.availableTokens -= count
return count
}
大家從代碼中也可以看到其實令牌桶的實現(xiàn)并沒有很復(fù)雜。
gin框架中使用限流中間件
在gin框架構(gòu)建的項目中,我們可以將限流組件定義成中間件。
這里使用令牌桶作為限流策略,編寫一個限流中間件如下:
func RateLimitMiddleware(fillInterval time.Duration, cap int64) func(c *gin.Context) {
bucket := ratelimit.NewBucket(fillInterval, cap)
return func(c *gin.Context) {
// 如果取不到令牌就中斷本次請求返回 rate limit...
if bucket.TakeAvailable(1) < 1 {
c.String(http.StatusOK, "rate limit...")
c.Abort()
return
}
c.Next()
}
}
對于該限流中間件的注冊位置,我們可以按照不同的限流策略將其注冊到不同的位置,例如:
如果要對全站限流就可以注冊成全局的中間件。如果是某一組路由需要限流,那么就只需將該限流中間件注冊到對應(yīng)的路由組即可。
參考文章:
https://www.fansimao.com/937511.html
柚子快報激活碼778899分享:gin中使用限流中間件
參考文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。