為什么需要降載
微服務(wù)集群中,調(diào)用鏈路錯(cuò)綜復(fù)雜,作為服務(wù)提供者需要有一種保護(hù)自己的機(jī)制,防止調(diào)用方無腦調(diào)用壓垮自己,保證自身服務(wù)的高可用。
最常見的保護(hù)機(jī)制莫過于限流機(jī)制,使用限流器的前提是必須知道自身的能夠處理的最大并發(fā)數(shù),一般在上線前通過壓測來得到最大并發(fā)數(shù),而且日常請(qǐng)求過程中每個(gè)接口的限流參數(shù)都不一樣,同時(shí)系統(tǒng)一直在不斷的迭代其處理能力往往也會(huì)隨之變化,每次上線前都需要進(jìn)行壓測然后調(diào)整限流參數(shù)變得非常繁瑣。
那么有沒有一種更加簡潔的限流機(jī)制能實(shí)現(xiàn)最大限度的自我保護(hù)呢?
什么是自適應(yīng)降載
自適應(yīng)降載能非常智能的保護(hù)服務(wù)自身,根據(jù)服務(wù)自身的系統(tǒng)負(fù)載動(dòng)態(tài)判斷是否需要降載。
設(shè)計(jì)目標(biāo):
- 保證系統(tǒng)不被拖垮。
- 在系統(tǒng)穩(wěn)定的前提下,保持系統(tǒng)的吞吐量。
那么關(guān)鍵就在于如何衡量服務(wù)自身的負(fù)載呢?
判斷高負(fù)載主要取決于兩個(gè)指標(biāo):
- cpu 是否過載。
- 最大并發(fā)數(shù)是否過載。
以上兩點(diǎn)同時(shí)滿足時(shí)則說明服務(wù)處于高負(fù)載狀態(tài),則進(jìn)行自適應(yīng)降載。
同時(shí)也應(yīng)該注意高并發(fā)場景 cpu 負(fù)載、并發(fā)數(shù)往往波動(dòng)比較大,從數(shù)據(jù)上我們稱這種現(xiàn)象為毛刺,毛刺現(xiàn)象可能會(huì)導(dǎo)致系統(tǒng)一直在頻繁的進(jìn)行自動(dòng)降載操作,所以我們一般獲取一段時(shí)間內(nèi)的指標(biāo)均值來使指標(biāo)更加平滑。實(shí)現(xiàn)上可以采用準(zhǔn)確的記錄一段時(shí)間內(nèi)的指標(biāo)然后直接計(jì)算平均值,但是需要占用一定的系統(tǒng)資源。
統(tǒng)計(jì)學(xué)上有一種算法:滑動(dòng)平均(exponential moving average),可以用來估算變量的局部均值,使得變量的更新與歷史一段時(shí)間的歷史取值有關(guān),無需記錄所有的歷史局部變量就可以實(shí)現(xiàn)平均值估算,非常節(jié)省寶貴的服務(wù)器資源。
滑動(dòng)平均算法原理 參考這篇文章講的非常清楚。
變量 V 在 t 時(shí)刻記為 Vt,θt 為變量 V 在 t 時(shí)刻的取值,即在不使用滑動(dòng)平均模型時(shí) Vt=θt,在使用滑動(dòng)平均模型后,Vt 的更新公式如下:
Vt=β?Vt?1+(1?β)?θt
- β = 0 時(shí) Vt = θt
- β = 0.9 時(shí),大致相當(dāng)于過去 10 個(gè) θt 值的平均
- β = 0.99 時(shí),大致相當(dāng)于過去 100 個(gè) θt 值的平均
代碼實(shí)現(xiàn)
接下來我們來看下 go-zero 自適應(yīng)降載的代碼實(shí)現(xiàn)。
core/load/adaptiveshedder.go
自適應(yīng)降載接口定義:
// 回調(diào)函數(shù)Promise interface { // 請(qǐng)求成功時(shí)回調(diào)此函數(shù) Pass() // 請(qǐng)求失敗時(shí)回調(diào)此函數(shù) Fail()}// 降載接口定義Shedder interface { // 降載檢查 // 1. 允許調(diào)用,需手動(dòng)執(zhí)行 Promise.accept()/reject()上報(bào)實(shí)際執(zhí)行任務(wù)結(jié)構(gòu) // 2. 拒絕調(diào)用,將會(huì)直接返回err:服務(wù)過載錯(cuò)誤 ErrServiceOverloaded Allow() (Promise, error)}
接口定義非常精簡意味使用起來其實(shí)非常簡單,對(duì)外暴露一個(gè)`Allow()(Promise,error)。
go-zero 使用示例:
業(yè)務(wù)中只需調(diào)該方法判斷是否降載,如果被降載則直接結(jié)束流程,否則執(zhí)行業(yè)務(wù)最后使用返回值 Promise 根據(jù)執(zhí)行結(jié)果回調(diào)結(jié)果即可。
func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor { ensureSheddingStat() return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (val interface{}, err error) { sheddingStat.IncrementTotal() var promise load.Promise // 檢查是否被降載 promise, err = shedder.Allow() // 降載,記錄相關(guān)日志與指標(biāo) if err != nil { metrics.AddDrop() sheddingStat.IncrementDrop() return } // 最后回調(diào)執(zhí)行結(jié)果 defer func() { // 執(zhí)行失敗 if err == context.DeadlineExceeded { promise.Fail() // 執(zhí)行成功 } else { sheddingStat.IncrementPass() promise.Pass() } }() // 執(zhí)行業(yè)務(wù)方法 return handler(ctx, req) }}
接口實(shí)現(xiàn)類定義 :
主要包含三類屬性
- cpu 負(fù)載閾值:超過此值意味著 cpu 處于高負(fù)載狀態(tài)。
- 冷卻期:假如服務(wù)之前被降載過,那么將進(jìn)入冷卻期,目的在于防止降載過程中負(fù)載還未降下來立馬加壓導(dǎo)致來回抖動(dòng)。因?yàn)榻档拓?fù)載需要一定的時(shí)間,處于冷卻期內(nèi)應(yīng)該繼續(xù)檢查并發(fā)數(shù)是否超過限制,超過限制則繼續(xù)丟棄請(qǐng)求。
- 并發(fā)數(shù):當(dāng)前正在處理的并發(fā)數(shù),當(dāng)前正在處理的并發(fā)平均數(shù),以及最近一段內(nèi)的請(qǐng)求數(shù)與響應(yīng)時(shí)間,目的是為了計(jì)算當(dāng)前正在處理的并發(fā)數(shù)是否大于系統(tǒng)可承載的最大并發(fā)數(shù)。
// option參數(shù)模式ShedderOption func(opts *shedderOptions)// 可選配置參數(shù)shedderOptions struct { // 滑動(dòng)時(shí)間窗口大小 window time.Duration // 滑動(dòng)時(shí)間窗口數(shù)量 buckets int // cpu負(fù)載臨界值 cpuThreshold int64}// 自適應(yīng)降載結(jié)構(gòu)體,需實(shí)現(xiàn) Shedder 接口adaptiveShedder struct { // cpu負(fù)載臨界值 // 高于臨界值代表高負(fù)載需要降載保證服務(wù) cpuThreshold int64 // 1s內(nèi)有多少個(gè)桶 windows int64 // 并發(fā)數(shù) flying int64 // 滑動(dòng)平滑并發(fā)數(shù) avgFlying float64 // 自旋鎖,一個(gè)服務(wù)共用一個(gè)降載 // 統(tǒng)計(jì)當(dāng)前正在處理的請(qǐng)求數(shù)時(shí)必須加鎖 // 無損并發(fā),提高性能 avgFlyingLock syncx.SpinLock // 最后一次拒絕時(shí)間 dropTime *syncx.AtomicDuration // 最近是否被拒絕過 droppedRecently *syncx.AtomicBool // 請(qǐng)求數(shù)統(tǒng)計(jì),通過滑動(dòng)時(shí)間窗口記錄最近一段時(shí)間內(nèi)指標(biāo) passCounter *collection.RollingWindow // 響應(yīng)時(shí)間統(tǒng)計(jì),通過滑動(dòng)時(shí)間窗口記錄最近一段時(shí)間內(nèi)指標(biāo) rtCounter *collection.RollingWindow}
自適應(yīng)降載構(gòu)造器:
func NewAdaptiveShedder(opts ...ShedderOption) Shedder { // 為了保證代碼統(tǒng)一 // 當(dāng)開發(fā)者關(guān)閉時(shí)返回默認(rèn)的空實(shí)現(xiàn),實(shí)現(xiàn)代碼統(tǒng)一 // go-zero很多地方都采用了這種設(shè)計(jì),比如Breaker,日志組件 if !enabled.True() { return newNopShedder() } // options模式設(shè)置可選配置參數(shù) options := shedderOptions{ // 默認(rèn)統(tǒng)計(jì)最近5s內(nèi)數(shù)據(jù) window: defaultWindow, // 默認(rèn)桶數(shù)量50個(gè) buckets: defaultBuckets, // cpu負(fù)載 cpuThreshold: defaultCpuThreshold, } for _, opt := range opts { opt(&options) } // 計(jì)算每個(gè)窗口間隔時(shí)間,默認(rèn)為100ms bucketDuration := options.window / time.Duration(options.buckets) return &adaptiveShedder{ // cpu負(fù)載 cpuThreshold: options.cpuThreshold, // 1s的時(shí)間內(nèi)包含多少個(gè)滑動(dòng)窗口單元 windows: int64(time.Second / bucketDuration), // 最近一次拒絕時(shí)間 dropTime: syncx.NewAtomicDuration(), // 最近是否被拒絕過 droppedRecently: syncx.NewAtomicBool(), // qps統(tǒng)計(jì),滑動(dòng)時(shí)間窗口 // 忽略當(dāng)前正在寫入窗口(桶),時(shí)間周期不完整可能導(dǎo)致數(shù)據(jù)異常 passCounter: collection.NewRollingWindow(options.buckets, bucketDuration, collection.IgnoreCurrentBucket()), // 響應(yīng)時(shí)間統(tǒng)計(jì),滑動(dòng)時(shí)間窗口 // 忽略當(dāng)前正在寫入窗口(桶),時(shí)間周期不完整可能導(dǎo)致數(shù)據(jù)異常 rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration, collection.IgnoreCurrentBucket()), }}
降載檢查 Allow()
:
檢查當(dāng)前請(qǐng)求是否應(yīng)該被丟棄,被丟棄業(yè)務(wù)側(cè)需要直接中斷請(qǐng)求保護(hù)服務(wù),也意味著降載生效同時(shí)進(jìn)入冷卻期。如果放行則返回 promise,等待業(yè)務(wù)側(cè)執(zhí)行回調(diào)函數(shù)執(zhí)行指標(biāo)統(tǒng)計(jì)。
// 降載檢查func (as *adaptiveShedder) Allow() (Promise, error) { // 檢查請(qǐng)求是否被丟棄 if as.shouldDrop() { // 設(shè)置drop時(shí)間 as.dropTime.Set(timex.Now()) // 最近已被drop as.droppedRecently.Set(true) // 返回過載 return nil, ErrServiceOverloaded } // 正在處理請(qǐng)求數(shù)加1 as.addFlying(1) // 這里每個(gè)允許的請(qǐng)求都會(huì)返回一個(gè)新的promise對(duì)象 // promise內(nèi)部持有了降載指針對(duì)象 return &promise{ start: timex.Now(), shedder: as, }, nil}
檢查是否應(yīng)該被丟棄shouldDrop()
:
// 請(qǐng)求是否應(yīng)該被丟棄func (as *adaptiveShedder) shouldDrop() bool { // 當(dāng)前cpu負(fù)載超過閾值 // 服務(wù)處于冷卻期內(nèi)應(yīng)該繼續(xù)檢查負(fù)載并嘗試丟棄請(qǐng)求 if as.systemOverloaded() || as.stillHot() { // 檢查正在處理的并發(fā)是否超出當(dāng)前可承載的最大并發(fā)數(shù) // 超出則丟棄請(qǐng)求 if as.highThru() { flying := atomic.LoadInt64(&as.flying) as.avgFlyingLock.Lock() avgFlying := as.avgFlying as.avgFlyingLock.Unlock() msg := fmt.Sprintf( "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f", stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying) logx.Error(msg) stat.Report(msg) return true } } return false}
cpu 閾值檢查 systemOverloaded()
:
cpu 負(fù)載值計(jì)算算法采用的滑動(dòng)平均算法,防止毛刺現(xiàn)象。每隔 250ms 采樣一次 β 為 0.95,大概相當(dāng)于歷史 20 次 cpu 負(fù)載的平均值,時(shí)間周期約為 5s。
// cpu 是否過載func (as *adaptiveShedder) systemOverloaded() bool { return systemOverloadChecker(as.cpuThreshold)}// cpu 檢查函數(shù)systemOverloadChecker = func(cpuThreshold int64) bool { return stat.CpuUsage() >= cpuThreshold}// cpu滑動(dòng)平均值curUsage := internal.RefreshCpu()prevUsage := atomic.LoadInt64(&cpuUsage)// cpu = cpu??1 * beta + cpu? * (1 - beta)// 滑動(dòng)平均算法usage := int64(float64(prevUsage)*beta + float64(curUsage)*(1-beta))atomic.StoreInt64(&cpuUsage, usage)
檢查是否處于冷卻期 stillHot
:
判斷當(dāng)前系統(tǒng)是否處于冷卻期,如果處于冷卻期內(nèi),應(yīng)該繼續(xù)嘗試檢查是否丟棄請(qǐng)求。主要是防止系統(tǒng)在過載恢復(fù)過程中負(fù)載還未降下來,立馬又增加壓力導(dǎo)致來回抖動(dòng),此時(shí)應(yīng)該嘗試?yán)^續(xù)丟棄請(qǐng)求。
func (as *adaptiveShedder) stillHot() bool { // 最近沒有丟棄請(qǐng)求 // 說明服務(wù)正常 if !as.droppedRecently.True() { return false } // 不在冷卻期 dropTime := as.dropTime.Load() if dropTime == 0 { return false } // 冷卻時(shí)間默認(rèn)為1s hot := timex.Since(dropTime) < coolOffDuration // 不在冷卻期,正常處理請(qǐng)求中 if !hot { // 重置drop記錄 as.droppedRecently.Set(false) } return hot}
檢查當(dāng)前正在處理的并發(fā)數(shù)highThru()
:
一旦 當(dāng)前處理的并發(fā)數(shù) > 并發(fā)數(shù)承載上限 則進(jìn)入降載狀態(tài)。
這里為什么要加鎖呢?因?yàn)樽赃m應(yīng)降載時(shí)全局在使用的,為了保證并發(fā)數(shù)平均值正確性。
為什么這里要加自旋鎖呢?因?yàn)椴l(fā)處理過程中,可以不阻塞其他的 goroutine 執(zhí)行任務(wù),采用無鎖并發(fā)提高性能。
func (as *adaptiveShedder) highThru() bool { // 加鎖 as.avgFlyingLock.Lock() // 獲取滑動(dòng)平均值 // 每次請(qǐng)求結(jié)束后更新 avgFlying := as.avgFlying // 解鎖 as.avgFlyingLock.Unlock() // 系統(tǒng)此時(shí)最大并發(fā)數(shù) maxFlight := as.maxFlight() // 正在處理的并發(fā)數(shù)和平均并發(fā)數(shù)是否大于系統(tǒng)的最大并發(fā)數(shù) return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight}
如何得到正在處理的并發(fā)數(shù)與平均并發(fā)數(shù)呢?
當(dāng)前正在的處理并發(fā)數(shù)統(tǒng)計(jì)其實(shí)非常簡單,每次允許請(qǐng)求時(shí)并發(fā)數(shù) +1,請(qǐng)求完成后 通過 promise 對(duì)象回調(diào)-1 即可,并利用滑動(dòng)平均算法求解平均并發(fā)數(shù)即可。
type promise struct { // 請(qǐng)求開始時(shí)間 // 統(tǒng)計(jì)請(qǐng)求處理耗時(shí) start time.Duration shedder *adaptiveShedder}func (p *promise) Fail() { // 請(qǐng)求結(jié)束,當(dāng)前正在處理請(qǐng)求數(shù)-1 p.shedder.addFlying(-1)}func (p *promise) Pass() { // 響應(yīng)時(shí)間,單位毫秒 rt := float64(timex.Since(p.start)) / float64(time.Millisecond) // 請(qǐng)求結(jié)束,當(dāng)前正在處理請(qǐng)求數(shù)-1 p.shedder.addFlying(-1) p.shedder.rtCounter.Add(math.Ceil(rt)) p.shedder.passCounter.Add(1)}func (as *adaptiveShedder) addFlying(delta int64) { flying := atomic.AddInt64(&as.flying, delta) // 請(qǐng)求結(jié)束后,統(tǒng)計(jì)當(dāng)前正在處理的請(qǐng)求并發(fā) if delta < 0 { as.avgFlyingLock.Lock() // 估算當(dāng)前服務(wù)近一段時(shí)間內(nèi)的平均請(qǐng)求數(shù) as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta) as.avgFlyingLock.Unlock() }}
得到了當(dāng)前的系統(tǒng)數(shù)還不夠 ,我們還需要知道當(dāng)前系統(tǒng)能夠處理并發(fā)數(shù)的上限,即最大并發(fā)數(shù)。
請(qǐng)求通過數(shù)與響應(yīng)時(shí)間都是通過滑動(dòng)窗口來實(shí)現(xiàn)的,關(guān)于滑動(dòng)窗口的實(shí)現(xiàn)可以參考 自適應(yīng)熔斷器
那篇文章。
當(dāng)前系統(tǒng)的最大并發(fā)數(shù) = 窗口單位時(shí)間內(nèi)的最大通過數(shù)量 * 窗口單位時(shí)間內(nèi)的最小響應(yīng)時(shí)間。
// 計(jì)算每秒系統(tǒng)的最大并發(fā)數(shù)// 最大并發(fā)數(shù) = 最大請(qǐng)求數(shù)(qps)* 最小響應(yīng)時(shí)間(rt)func (as *adaptiveShedder) maxFlight() int64 { // windows = buckets per second // maxQPS = maxPASS * windows // minRT = min average response time in milliseconds // maxQPS * minRT / milliseconds_per_second // as.maxPass()*as.windows - 每個(gè)桶最大的qps * 1s內(nèi)包含桶的數(shù)量 // as.minRt()/1e3 - 窗口所有桶中最小的平均響應(yīng)時(shí)間 / 1000ms這里是為了轉(zhuǎn)換成秒 return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))} // 滑動(dòng)時(shí)間窗口內(nèi)有多個(gè)桶// 找到請(qǐng)求數(shù)最多的那個(gè)// 每個(gè)桶占用的時(shí)間為 internal ms// qps指的是1s內(nèi)的請(qǐng)求數(shù),qps: maxPass * time.Second/internalfunc (as *adaptiveShedder) maxPass() int64 { var result float64 = 1 // 當(dāng)前時(shí)間窗口內(nèi)請(qǐng)求數(shù)最多的桶 as.passCounter.Reduce(func(b *collection.Bucket) { if b.Sum > result { result = b.Sum } }) return int64(result)}// 滑動(dòng)時(shí)間窗口內(nèi)有多個(gè)桶// 計(jì)算最小的平均響應(yīng)時(shí)間// 因?yàn)樾枰?jì)算近一段時(shí)間內(nèi)系統(tǒng)能夠處理的最大并發(fā)數(shù)func (as *adaptiveShedder) minRt() float64 { // 默認(rèn)為1000ms result := defaultMinRt as.rtCounter.Reduce(func(b *collection.Bucket) { if b.Count <= 0 { return } // 請(qǐng)求平均響應(yīng)時(shí)間 avg := math.Round(b.Sum / float64(b.Count)) if avg < result { result = avg } }) return result}
參考資料
項(xiàng)目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero
并 star 支持我們!
微信交流群
關(guān)注『微服務(wù)實(shí)踐』公眾號(hào)并點(diǎn)擊 交流群 獲取社區(qū)群二維碼。