為什么需要降載

微服務(wù)集群中,調(diào)用鏈路錯(cuò)綜復(fù)雜,作為服務(wù)提供者需要有一種保護(hù)自己的機(jī)制,防止調(diào)用方無腦調(diào)用壓垮自己,保證自身服務(wù)的高可用。

最常見的保護(hù)機(jī)制莫過于限流機(jī)制,使用限流器的前提是必須知道自身的能夠處理的最大并發(fā)數(shù),一般在上線前通過壓測來得到最大并發(fā)數(shù),而且日常請(qǐng)求過程中每個(gè)接口的限流參數(shù)都不一樣,同時(shí)系統(tǒng)一直在不斷的迭代其處理能力往往也會(huì)隨之變化,每次上線前都需要進(jìn)行壓測然后調(diào)整限流參數(shù)變得非常繁瑣。

那么有沒有一種更加簡潔的限流機(jī)制能實(shí)現(xiàn)最大限度的自我保護(hù)呢?

什么是自適應(yīng)降載

自適應(yīng)降載能非常智能的保護(hù)服務(wù)自身,根據(jù)服務(wù)自身的系統(tǒng)負(fù)載動(dòng)態(tài)判斷是否需要降載。

設(shè)計(jì)目標(biāo):

  1. 保證系統(tǒng)不被拖垮。
  2. 在系統(tǒng)穩(wěn)定的前提下,保持系統(tǒng)的吞吐量。

那么關(guān)鍵就在于如何衡量服務(wù)自身的負(fù)載呢?

判斷高負(fù)載主要取決于兩個(gè)指標(biāo):

  1. cpu 是否過載。
  2. 最大并發(fā)數(shù)是否過載。

以上兩點(diǎn)同時(shí)滿足時(shí)則說明服務(wù)處于高負(fù)載狀態(tài),則進(jìn)行自適應(yīng)降載。

同時(shí)也應(yīng)該注意高并發(fā)場景 cpu 負(fù)載、并發(fā)數(shù)往往波動(dòng)比較大,從數(shù)據(jù)上我們稱這種現(xiàn)象為毛刺,毛刺現(xiàn)象可能會(huì)導(dǎo)致系統(tǒng)一直在頻繁的進(jìn)行自動(dòng)降載操作,所以我們一般獲取一段時(shí)間內(nèi)的指標(biāo)均值來使指標(biāo)更加平滑。實(shí)現(xiàn)上可以采用準(zhǔn)確的記錄一段時(shí)間內(nèi)的指標(biāo)然后直接計(jì)算平均值,但是需要占用一定的系統(tǒng)資源。

統(tǒng)計(jì)學(xué)上有一種算法:滑動(dòng)平均(exponential moving average),可以用來估算變量的局部均值,使得變量的更新與歷史一段時(shí)間的歷史取值有關(guān),無需記錄所有的歷史局部變量就可以實(shí)現(xiàn)平均值估算,非常節(jié)省寶貴的服務(wù)器資源。

滑動(dòng)平均算法原理 參考這篇文章講的非常清楚。

變量 V 在 t 時(shí)刻記為 Vt,θt 為變量 V 在 t 時(shí)刻的取值,即在不使用滑動(dòng)平均模型時(shí) Vt=θt,在使用滑動(dòng)平均模型后,Vt 的更新公式如下:

Vt=β?Vt?1+(1?β)?θt

  • β = 0 時(shí) Vt = θt
  • β = 0.9 時(shí),大致相當(dāng)于過去 10 個(gè) θt 值的平均
  • β = 0.99 時(shí),大致相當(dāng)于過去 100 個(gè) θt 值的平均

代碼實(shí)現(xiàn)

接下來我們來看下 go-zero 自適應(yīng)降載的代碼實(shí)現(xiàn)。

core/load/adaptiveshedder.go

自適應(yīng)降載接口定義:

// 回調(diào)函數(shù)Promise interface {    // 請(qǐng)求成功時(shí)回調(diào)此函數(shù)    Pass()    // 請(qǐng)求失敗時(shí)回調(diào)此函數(shù)    Fail()}// 降載接口定義Shedder interface {    // 降載檢查    // 1. 允許調(diào)用,需手動(dòng)執(zhí)行 Promise.accept()/reject()上報(bào)實(shí)際執(zhí)行任務(wù)結(jié)構(gòu)    // 2. 拒絕調(diào)用,將會(huì)直接返回err:服務(wù)過載錯(cuò)誤 ErrServiceOverloaded    Allow() (Promise, error)}

接口定義非常精簡意味使用起來其實(shí)非常簡單,對(duì)外暴露一個(gè)`Allow()(Promise,error)。

go-zero 使用示例:

業(yè)務(wù)中只需調(diào)該方法判斷是否降載,如果被降載則直接結(jié)束流程,否則執(zhí)行業(yè)務(wù)最后使用返回值 Promise 根據(jù)執(zhí)行結(jié)果回調(diào)結(jié)果即可。

func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {    ensureSheddingStat()    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,        handler grpc.UnaryHandler) (val interface{}, err error) {        sheddingStat.IncrementTotal()        var promise load.Promise        // 檢查是否被降載        promise, err = shedder.Allow()        // 降載,記錄相關(guān)日志與指標(biāo)        if err != nil {            metrics.AddDrop()            sheddingStat.IncrementDrop()            return        }        // 最后回調(diào)執(zhí)行結(jié)果        defer func() {            // 執(zhí)行失敗            if err == context.DeadlineExceeded {                promise.Fail()            // 執(zhí)行成功            } else {                sheddingStat.IncrementPass()                promise.Pass()            }        }()        // 執(zhí)行業(yè)務(wù)方法        return handler(ctx, req)    }}

接口實(shí)現(xiàn)類定義 :

主要包含三類屬性

  1. cpu 負(fù)載閾值:超過此值意味著 cpu 處于高負(fù)載狀態(tài)。
  2. 冷卻期:假如服務(wù)之前被降載過,那么將進(jìn)入冷卻期,目的在于防止降載過程中負(fù)載還未降下來立馬加壓導(dǎo)致來回抖動(dòng)。因?yàn)榻档拓?fù)載需要一定的時(shí)間,處于冷卻期內(nèi)應(yīng)該繼續(xù)檢查并發(fā)數(shù)是否超過限制,超過限制則繼續(xù)丟棄請(qǐng)求。
  3. 并發(fā)數(shù):當(dāng)前正在處理的并發(fā)數(shù),當(dāng)前正在處理的并發(fā)平均數(shù),以及最近一段內(nèi)的請(qǐng)求數(shù)與響應(yīng)時(shí)間,目的是為了計(jì)算當(dāng)前正在處理的并發(fā)數(shù)是否大于系統(tǒng)可承載的最大并發(fā)數(shù)。
// option參數(shù)模式ShedderOption func(opts *shedderOptions)// 可選配置參數(shù)shedderOptions struct {    // 滑動(dòng)時(shí)間窗口大小    window time.Duration    // 滑動(dòng)時(shí)間窗口數(shù)量    buckets int    // cpu負(fù)載臨界值    cpuThreshold int64}// 自適應(yīng)降載結(jié)構(gòu)體,需實(shí)現(xiàn) Shedder 接口adaptiveShedder struct {    // cpu負(fù)載臨界值    // 高于臨界值代表高負(fù)載需要降載保證服務(wù)    cpuThreshold int64    // 1s內(nèi)有多少個(gè)桶    windows int64    // 并發(fā)數(shù)    flying int64    // 滑動(dòng)平滑并發(fā)數(shù)    avgFlying float64    // 自旋鎖,一個(gè)服務(wù)共用一個(gè)降載    // 統(tǒng)計(jì)當(dāng)前正在處理的請(qǐng)求數(shù)時(shí)必須加鎖    // 無損并發(fā),提高性能    avgFlyingLock syncx.SpinLock    // 最后一次拒絕時(shí)間    dropTime *syncx.AtomicDuration    // 最近是否被拒絕過    droppedRecently *syncx.AtomicBool    // 請(qǐng)求數(shù)統(tǒng)計(jì),通過滑動(dòng)時(shí)間窗口記錄最近一段時(shí)間內(nèi)指標(biāo)    passCounter *collection.RollingWindow    // 響應(yīng)時(shí)間統(tǒng)計(jì),通過滑動(dòng)時(shí)間窗口記錄最近一段時(shí)間內(nèi)指標(biāo)    rtCounter *collection.RollingWindow}

自適應(yīng)降載構(gòu)造器:

func NewAdaptiveShedder(opts ...ShedderOption) Shedder {    // 為了保證代碼統(tǒng)一    // 當(dāng)開發(fā)者關(guān)閉時(shí)返回默認(rèn)的空實(shí)現(xiàn),實(shí)現(xiàn)代碼統(tǒng)一    // go-zero很多地方都采用了這種設(shè)計(jì),比如Breaker,日志組件    if !enabled.True() {        return newNopShedder()    }    // options模式設(shè)置可選配置參數(shù)    options := shedderOptions{        // 默認(rèn)統(tǒng)計(jì)最近5s內(nèi)數(shù)據(jù)        window: defaultWindow,        // 默認(rèn)桶數(shù)量50個(gè)        buckets:      defaultBuckets,        // cpu負(fù)載        cpuThreshold: defaultCpuThreshold,    }    for _, opt := range opts {        opt(&options)    }    // 計(jì)算每個(gè)窗口間隔時(shí)間,默認(rèn)為100ms    bucketDuration := options.window / time.Duration(options.buckets)    return &adaptiveShedder{        // cpu負(fù)載        cpuThreshold:    options.cpuThreshold,        // 1s的時(shí)間內(nèi)包含多少個(gè)滑動(dòng)窗口單元        windows:         int64(time.Second / bucketDuration),        // 最近一次拒絕時(shí)間        dropTime:        syncx.NewAtomicDuration(),        // 最近是否被拒絕過        droppedRecently: syncx.NewAtomicBool(),        // qps統(tǒng)計(jì),滑動(dòng)時(shí)間窗口        // 忽略當(dāng)前正在寫入窗口(桶),時(shí)間周期不完整可能導(dǎo)致數(shù)據(jù)異常        passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,            collection.IgnoreCurrentBucket()),        // 響應(yīng)時(shí)間統(tǒng)計(jì),滑動(dòng)時(shí)間窗口        // 忽略當(dāng)前正在寫入窗口(桶),時(shí)間周期不完整可能導(dǎo)致數(shù)據(jù)異常        rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,            collection.IgnoreCurrentBucket()),    }}

降載檢查 Allow()

檢查當(dāng)前請(qǐng)求是否應(yīng)該被丟棄,被丟棄業(yè)務(wù)側(cè)需要直接中斷請(qǐng)求保護(hù)服務(wù),也意味著降載生效同時(shí)進(jìn)入冷卻期。如果放行則返回 promise,等待業(yè)務(wù)側(cè)執(zhí)行回調(diào)函數(shù)執(zhí)行指標(biāo)統(tǒng)計(jì)。

// 降載檢查func (as *adaptiveShedder) Allow() (Promise, error) {    // 檢查請(qǐng)求是否被丟棄    if as.shouldDrop() {        // 設(shè)置drop時(shí)間        as.dropTime.Set(timex.Now())        // 最近已被drop        as.droppedRecently.Set(true)        // 返回過載        return nil, ErrServiceOverloaded    }    // 正在處理請(qǐng)求數(shù)加1    as.addFlying(1)    // 這里每個(gè)允許的請(qǐng)求都會(huì)返回一個(gè)新的promise對(duì)象    // promise內(nèi)部持有了降載指針對(duì)象    return &promise{        start:   timex.Now(),        shedder: as,    }, nil}

檢查是否應(yīng)該被丟棄shouldDrop()

// 請(qǐng)求是否應(yīng)該被丟棄func (as *adaptiveShedder) shouldDrop() bool {    // 當(dāng)前cpu負(fù)載超過閾值    // 服務(wù)處于冷卻期內(nèi)應(yīng)該繼續(xù)檢查負(fù)載并嘗試丟棄請(qǐng)求    if as.systemOverloaded() || as.stillHot() {        // 檢查正在處理的并發(fā)是否超出當(dāng)前可承載的最大并發(fā)數(shù)        // 超出則丟棄請(qǐng)求        if as.highThru() {            flying := atomic.LoadInt64(&as.flying)            as.avgFlyingLock.Lock()            avgFlying := as.avgFlying            as.avgFlyingLock.Unlock()            msg := fmt.Sprintf(                "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",                stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)            logx.Error(msg)            stat.Report(msg)            return true        }    }    return false}

cpu 閾值檢查 systemOverloaded()

cpu 負(fù)載值計(jì)算算法采用的滑動(dòng)平均算法,防止毛刺現(xiàn)象。每隔 250ms 采樣一次 β 為 0.95,大概相當(dāng)于歷史 20 次 cpu 負(fù)載的平均值,時(shí)間周期約為 5s。

// cpu 是否過載func (as *adaptiveShedder) systemOverloaded() bool {    return systemOverloadChecker(as.cpuThreshold)}// cpu 檢查函數(shù)systemOverloadChecker = func(cpuThreshold int64) bool {        return stat.CpuUsage() >= cpuThreshold}// cpu滑動(dòng)平均值curUsage := internal.RefreshCpu()prevUsage := atomic.LoadInt64(&cpuUsage)// cpu = cpu??1 * beta + cpu? * (1 - beta)// 滑動(dòng)平均算法usage := int64(float64(prevUsage)*beta + float64(curUsage)*(1-beta))atomic.StoreInt64(&cpuUsage, usage)

檢查是否處于冷卻期 stillHot:

判斷當(dāng)前系統(tǒng)是否處于冷卻期,如果處于冷卻期內(nèi),應(yīng)該繼續(xù)嘗試檢查是否丟棄請(qǐng)求。主要是防止系統(tǒng)在過載恢復(fù)過程中負(fù)載還未降下來,立馬又增加壓力導(dǎo)致來回抖動(dòng),此時(shí)應(yīng)該嘗試?yán)^續(xù)丟棄請(qǐng)求。

func (as *adaptiveShedder) stillHot() bool {    // 最近沒有丟棄請(qǐng)求    // 說明服務(wù)正常    if !as.droppedRecently.True() {        return false    }    // 不在冷卻期    dropTime := as.dropTime.Load()    if dropTime == 0 {        return false    }    // 冷卻時(shí)間默認(rèn)為1s    hot := timex.Since(dropTime) < coolOffDuration    // 不在冷卻期,正常處理請(qǐng)求中    if !hot {        // 重置drop記錄        as.droppedRecently.Set(false)    }    return hot}

檢查當(dāng)前正在處理的并發(fā)數(shù)highThru()

一旦 當(dāng)前處理的并發(fā)數(shù) > 并發(fā)數(shù)承載上限 則進(jìn)入降載狀態(tài)。

這里為什么要加鎖呢?因?yàn)樽赃m應(yīng)降載時(shí)全局在使用的,為了保證并發(fā)數(shù)平均值正確性。

為什么這里要加自旋鎖呢?因?yàn)椴l(fā)處理過程中,可以不阻塞其他的 goroutine 執(zhí)行任務(wù),采用無鎖并發(fā)提高性能。

func (as *adaptiveShedder) highThru() bool {    // 加鎖    as.avgFlyingLock.Lock()    // 獲取滑動(dòng)平均值    // 每次請(qǐng)求結(jié)束后更新    avgFlying := as.avgFlying    // 解鎖    as.avgFlyingLock.Unlock()    // 系統(tǒng)此時(shí)最大并發(fā)數(shù)    maxFlight := as.maxFlight()    // 正在處理的并發(fā)數(shù)和平均并發(fā)數(shù)是否大于系統(tǒng)的最大并發(fā)數(shù)    return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight}

如何得到正在處理的并發(fā)數(shù)與平均并發(fā)數(shù)呢?

當(dāng)前正在的處理并發(fā)數(shù)統(tǒng)計(jì)其實(shí)非常簡單,每次允許請(qǐng)求時(shí)并發(fā)數(shù) +1,請(qǐng)求完成后 通過 promise 對(duì)象回調(diào)-1 即可,并利用滑動(dòng)平均算法求解平均并發(fā)數(shù)即可。

type promise struct {    // 請(qǐng)求開始時(shí)間    // 統(tǒng)計(jì)請(qǐng)求處理耗時(shí)    start   time.Duration    shedder *adaptiveShedder}func (p *promise) Fail() {    // 請(qǐng)求結(jié)束,當(dāng)前正在處理請(qǐng)求數(shù)-1    p.shedder.addFlying(-1)}func (p *promise) Pass() {    // 響應(yīng)時(shí)間,單位毫秒    rt := float64(timex.Since(p.start)) / float64(time.Millisecond)    // 請(qǐng)求結(jié)束,當(dāng)前正在處理請(qǐng)求數(shù)-1    p.shedder.addFlying(-1)    p.shedder.rtCounter.Add(math.Ceil(rt))    p.shedder.passCounter.Add(1)}func (as *adaptiveShedder) addFlying(delta int64) {    flying := atomic.AddInt64(&as.flying, delta)    // 請(qǐng)求結(jié)束后,統(tǒng)計(jì)當(dāng)前正在處理的請(qǐng)求并發(fā)    if delta < 0 {        as.avgFlyingLock.Lock()        // 估算當(dāng)前服務(wù)近一段時(shí)間內(nèi)的平均請(qǐng)求數(shù)        as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)        as.avgFlyingLock.Unlock()    }}

得到了當(dāng)前的系統(tǒng)數(shù)還不夠 ,我們還需要知道當(dāng)前系統(tǒng)能夠處理并發(fā)數(shù)的上限,即最大并發(fā)數(shù)。

請(qǐng)求通過數(shù)與響應(yīng)時(shí)間都是通過滑動(dòng)窗口來實(shí)現(xiàn)的,關(guān)于滑動(dòng)窗口的實(shí)現(xiàn)可以參考 自適應(yīng)熔斷器那篇文章。

當(dāng)前系統(tǒng)的最大并發(fā)數(shù) = 窗口單位時(shí)間內(nèi)的最大通過數(shù)量 * 窗口單位時(shí)間內(nèi)的最小響應(yīng)時(shí)間。

// 計(jì)算每秒系統(tǒng)的最大并發(fā)數(shù)// 最大并發(fā)數(shù) = 最大請(qǐng)求數(shù)(qps)* 最小響應(yīng)時(shí)間(rt)func (as *adaptiveShedder) maxFlight() int64 {    // windows = buckets per second    // maxQPS = maxPASS * windows    // minRT = min average response time in milliseconds    // maxQPS * minRT / milliseconds_per_second    // as.maxPass()*as.windows - 每個(gè)桶最大的qps * 1s內(nèi)包含桶的數(shù)量    // as.minRt()/1e3 - 窗口所有桶中最小的平均響應(yīng)時(shí)間 / 1000ms這里是為了轉(zhuǎn)換成秒    return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))}    // 滑動(dòng)時(shí)間窗口內(nèi)有多個(gè)桶// 找到請(qǐng)求數(shù)最多的那個(gè)// 每個(gè)桶占用的時(shí)間為 internal ms// qps指的是1s內(nèi)的請(qǐng)求數(shù),qps: maxPass * time.Second/internalfunc (as *adaptiveShedder) maxPass() int64 {    var result float64 = 1    // 當(dāng)前時(shí)間窗口內(nèi)請(qǐng)求數(shù)最多的桶    as.passCounter.Reduce(func(b *collection.Bucket) {        if b.Sum > result {            result = b.Sum        }    })    return int64(result)}// 滑動(dòng)時(shí)間窗口內(nèi)有多個(gè)桶// 計(jì)算最小的平均響應(yīng)時(shí)間// 因?yàn)樾枰?jì)算近一段時(shí)間內(nèi)系統(tǒng)能夠處理的最大并發(fā)數(shù)func (as *adaptiveShedder) minRt() float64 {    // 默認(rèn)為1000ms    result := defaultMinRt    as.rtCounter.Reduce(func(b *collection.Bucket) {        if b.Count <= 0 {            return        }        // 請(qǐng)求平均響應(yīng)時(shí)間        avg := math.Round(b.Sum / float64(b.Count))        if avg < result {            result = avg        }    })    return result}

參考資料

Google BBR 擁塞控制算法

滑動(dòng)平均算法原理

go-zero 自適應(yīng)降載

項(xiàng)目地址

https://github.com/zeromicro/go-zero

歡迎使用 go-zerostar 支持我們!

微信交流群

關(guān)注『微服務(wù)實(shí)踐』公眾號(hào)并點(diǎn)擊 交流群 獲取社區(qū)群二維碼。