摘要:代碼實現(xiàn)代碼實現(xiàn)接下來思考一個熔斷器如何實現(xiàn)。同時熔斷器的狀態(tài)也需要依靠指標統(tǒng)計來實現(xiàn)可觀測性,我們實現(xiàn)任何系統(tǒng)第一步需要考慮就是可觀測性,不然系統(tǒng)就是一個黑盒。可能是,熔斷器需要實時收集此數(shù)據(jù)。熔斷方法,自動上報執(zhí)行結(jié)果自動擋。。。
微服務(wù)集群中,每個應(yīng)用基本都會依賴一定數(shù)量的外部服務(wù)。有可能隨時都會遇到網(wǎng)絡(luò)連接緩慢,超時,依賴服務(wù)過載,服務(wù)不可用的情況,在高并發(fā)場景下如果此時調(diào)用方不做任何處理,繼續(xù)持續(xù)請求故障服務(wù)的話很容易引起整個微服務(wù)集群雪崩。
比如高并發(fā)場景的用戶訂單服務(wù),一般需要依賴一下服務(wù):
假如此時 賬戶服務(wù) 過載,訂單服務(wù)持續(xù)請求賬戶服務(wù)只能被動的等待賬戶服務(wù)報錯或者請求超時,進而導致訂單請求被大量堆積,這些無效請求依然會占用系統(tǒng)資源:cpu,內(nèi)存,數(shù)據(jù)連接...導致訂單服務(wù)整體不可用。即使賬戶服務(wù)恢復了訂單服務(wù)也無法自我恢復。
這時如果有一個主動保護機制應(yīng)對這種場景的話訂單服務(wù)至少可以保證自身的運行狀態(tài),等待賬戶服務(wù)恢復時訂單服務(wù)也同步自我恢復,這種自我保護機制在服務(wù)治理中叫熔斷機制。
熔斷
熔斷是調(diào)用方自我保護的機制(客觀上也能保護被調(diào)用方),熔斷對象是外部服務(wù)。
降級
降級是被調(diào)用方(服務(wù)提供者)的防止因自身資源不足導致過載的自我保護機制,降級對象是自身。
熔斷這一詞來源時我們?nèi)粘I铍娐防锩娴娜蹟嗥?,當負載過高時(電流過大)保險絲會自行熔斷防止電路被燒壞,很多技術(shù)都是來自生活場景的提煉。
熔斷器一般具有三個狀態(tài):
使用較多的熔斷組件:
基于上面提到的熔斷器原理,項目中我們要使用好熔斷器通常需要準備以下參數(shù):
實際上可選的配置參數(shù)還有非常非常多,參考 https://resilience4j.readme.io/docs/circuitbreaker
對于經(jīng)驗不夠豐富的開發(fā)人員而言,這些參數(shù)設(shè)置多少合適心里其實并沒有底。
那么有沒有一種自適應(yīng)的熔斷算法能讓我們不關(guān)注參數(shù),只要簡單配置就能滿足大部分場景?
其實是有的,google sre提供了一種自適應(yīng)熔斷算法來計算丟棄請求的概率:
算法參數(shù):
算法解釋:
接下來思考一個熔斷器如何實現(xiàn)。
初步思路是:
下面來逐步分析 go-zero 的源碼實現(xiàn):
core/breaker/breaker.go
兵馬未動,糧草先行,明確了需求后就可以開始規(guī)劃定義接口了,接口是我們編碼思維抽象的第一步也是最重要的一步。
核心定義包含兩種類型的方法:
Allow():需要手動回調(diào)請求結(jié)果至熔斷器,相當于手動擋。
DoXXX():自動回調(diào)請求結(jié)果至熔斷器,相當于自動擋,實際上 DoXXX() 類型方法最后都是調(diào)用DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
// 自定義判定執(zhí)行結(jié)果 Acceptable func(err error) bool // 手動回調(diào) Promise interface { // Accept tells the Breaker that the call is successful. // 請求成功 Accept() // Reject tells the Breaker that the call is failed. // 請求失敗 Reject(reason string) } Breaker interface { // 熔斷器名稱 Name() string // 熔斷方法,執(zhí)行請求時必須手動上報執(zhí)行結(jié)果 // 適用于簡單無需自定義快速失敗,無需自定義判定請求結(jié)果的場景 // 相當于手動擋。。。 Allow() (Promise, error) // 熔斷方法,自動上報執(zhí)行結(jié)果 // 自動擋。。。 Do(req func() error) error // 熔斷方法 // acceptable - 支持自定義判定執(zhí)行結(jié)果 DoWithAcceptable(req func() error, acceptable Acceptable) error // 熔斷方法 // fallback - 支持自定義快速失敗 DoWithFallback(req func() error, fallback func(err error) error) error // 熔斷方法 // fallback - 支持自定義快速失敗 // acceptable - 支持自定義判定執(zhí)行結(jié)果 DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error }
circuitBreaker 繼承 throttle,實際上這里相當于靜態(tài)代理,代理模式可以在不改變原有對象的基礎(chǔ)上增強功能,后面我們會看到 go-zero 這樣做的原因是為了收集熔斷器錯誤數(shù)據(jù),也就是為了實現(xiàn)可觀測性。
熔斷器實現(xiàn)采用靜態(tài)代理模式,看起來稍微有點繞腦。
// 熔斷器結(jié)構(gòu)體circuitBreaker struct { name string // 實際上 circuitBreaker熔斷功能都代理給 throttle來實現(xiàn) throttle}// 熔斷器接口throttle interface { // 熔斷方法 allow() (Promise, error) // 熔斷方法 // DoXXX()方法最終都會該方法 doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error} func (cb *circuitBreaker) Allow() (Promise, error) { return cb.throttle.allow()} func (cb *circuitBreaker) Do(req func() error) error { return cb.throttle.doReq(req, nil, defaultAcceptable)} func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error { return cb.throttle.doReq(req, nil, acceptable)} func (cb *circuitBreaker) DoWithFallback(req func() error, fallback func(err error) error) error { return cb.throttle.doReq(req, fallback, defaultAcceptable)} func (cb *circuitBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error { return cb.throttle.doReq(req, fallback, acceptable)}
throttle 接口實現(xiàn)類:
loggedThrottle 增加了為了收集錯誤日志的滾動窗口,目的是為了收集當請求失敗時的錯誤日志。
// 帶日志功能的熔斷器type loggedThrottle struct { // 名稱 name string // 代理對象 internalThrottle // 滾動窗口,滾動收集數(shù)據(jù),相當于環(huán)形數(shù)組 errWin *errorWindow}// 熔斷方法func (lt loggedThrottle) allow() (Promise, error) { promise, err := lt.internalThrottle.allow() return promiseWithReason{ promise: promise, errWin: lt.errWin, }, lt.logError(err)}// 熔斷方法func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error { return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool { accept := acceptable(err) if !accept { lt.errWin.add(err.Error()) } return accept }))}func (lt loggedThrottle) logError(err error) error { if err == ErrServiceUnavailable { // if circuit open, not possible to have empty error window stat.Report(fmt.Sprintf( "proc(%s/%d), callee: %s, breaker is open and requests dropped/nlast errors:/n%s", proc.ProcessName(), proc.Pid(), lt.name, lt.errWin)) } return err}
errorWindow 是一個環(huán)形數(shù)組,新數(shù)據(jù)不斷滾動覆蓋最舊的數(shù)據(jù),通過取余實現(xiàn)。
// 滾動窗口type errorWindow struct { reasons [numHistoryReasons]string index int count int lock sync.Mutex}// 添加數(shù)據(jù)func (ew *errorWindow) add(reason string) { ew.lock.Lock() // 添加錯誤日志 ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason) // 更新index,為下一次寫入數(shù)據(jù)做準備 // 這里用的取模實現(xiàn)了滾動功能 ew.index = (ew.index + 1) % numHistoryReasons // 統(tǒng)計數(shù)量 ew.count = mathx.MinInt(ew.count+1, numHistoryReasons) ew.lock.Unlock()}// 格式化錯誤日志func (ew *errorWindow) String() string { var reasons []string ew.lock.Lock() // reverse order for i := ew.index - 1; i >= ew.index-ew.count; i-- { reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons]) } ew.lock.Unlock() return strings.Join(reasons, "/n")}
看到這里我們還沒看到實際的熔斷器實現(xiàn),實際上真正的熔斷操作被代理給了 internalThrottle 對象。
internalThrottle interface { allow() (internalPromise, error) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error }
type googleBreaker struct { // 敏感度,go-zero中默認值為1.5 k float64 // 滑動窗口,用于記錄最近一段時間內(nèi)的請求總數(shù),成功總數(shù) stat *collection.RollingWindow // 概率生成器 // 隨機產(chǎn)生0.0-1.0之間的雙精度浮點數(shù) proba *mathx.Proba}
可以看到熔斷器屬性其實非常簡單,數(shù)據(jù)統(tǒng)計采用的是滑動時間窗口來實現(xiàn)。
滑動窗口屬于比較通用的數(shù)據(jù)結(jié)構(gòu),常用于最近一段時間內(nèi)的行為數(shù)據(jù)統(tǒng)計。
它的實現(xiàn)非常有意思,尤其是如何模擬窗口滑動過程。
先來看滑動窗口的結(jié)構(gòu)體定義:
RollingWindow struct { // 互斥鎖 lock sync.RWMutex // 滑動窗口數(shù)量 size int // 窗口,數(shù)據(jù)容器 win *window // 滑動窗口單元時間間隔 interval time.Duration // 游標,用于定位當前應(yīng)該寫入哪個bucket offset int // 匯總數(shù)據(jù)時,是否忽略當前正在寫入桶的數(shù)據(jù) // 某些場景下因為當前正在寫入的桶數(shù)據(jù)并沒有經(jīng)過完整的窗口時間間隔 // 可能導致當前桶的統(tǒng)計并不準確 ignoreCurrent bool // 最后寫入桶的時間 // 用于計算下一次寫入數(shù)據(jù)間隔最后一次寫入數(shù)據(jù)的之間 // 經(jīng)過了多少個時間間隔 lastTime time.Duration }
window 是數(shù)據(jù)的實際存儲位置,其實就是一個數(shù)組,提供向指定 offset 添加數(shù)據(jù)與清除操作。
數(shù)組里面按照 internal 時間間隔分隔成多個 bucket。
// 時間窗口type window struct { // 桶 // 一個桶標識一個時間間隔 buckets []*Bucket // 窗口大小 size int}// 添加數(shù)據(jù)// offset - 游標,定位寫入bucket位置// v - 行為數(shù)據(jù)func (w *window) add(offset int, v float64) { w.buckets[offset%w.size].add(v)}// 匯總數(shù)據(jù)// fn - 自定義的bucket統(tǒng)計函數(shù)func (w *window) reduce(start, count int, fn func(b *Bucket)) { for i := 0; i < count; i++ { fn(w.buckets[(start+i)%w.size]) }}// 清理特定bucketfunc (w *window) resetBucket(offset int) { w.buckets[offset%w.size].reset()}// 桶type Bucket struct { // 當前桶內(nèi)值之和 Sum float64 // 當前桶的add總次數(shù) Count int64}// 向桶添加數(shù)據(jù)func (b *Bucket) add(v float64) { // 求和 b.Sum += v // 次數(shù)+1 b.Count++}// 桶數(shù)據(jù)清零func (b *Bucket) reset() { b.Sum = 0 b.Count = 0}
window 添加數(shù)據(jù):
// 添加數(shù)據(jù)func (rw *RollingWindow) Add(v float64) { rw.lock.Lock() defer rw.lock.Unlock() // 獲取當前寫入的下標 rw.updateOffset() // 添加數(shù)據(jù) rw.win.add(rw.offset, v)}// 計算當前距離最后寫入數(shù)據(jù)經(jīng)過多少個單元時間間隔// 實際上指的就是經(jīng)過多少個桶func (rw *RollingWindow) span() int { offset := int(timex.Since(rw.lastTime) / rw.interval) if 0 <= offset && offset < rw.size { return offset } // 大于時間窗口時 返回窗口大小即可 return rw.size}// 更新當前時間的offset// 實現(xiàn)窗口滑動func (rw *RollingWindow) updateOffset() { // 經(jīng)過span個桶的時間 span := rw.span() // 還在同一單元時間內(nèi)不需要更新 if span <= 0 { return } offset := rw.offset // 既然經(jīng)過了span個桶的時間沒有寫入數(shù)據(jù) // 那么這些桶內(nèi)的數(shù)據(jù)就不應(yīng)該繼續(xù)保留了,屬于過期數(shù)據(jù)清空即可 // 可以看到這里全部用的 % 取余操作,可以實現(xiàn)按照下標周期性寫入 // 如果超出下標了那就從頭開始寫,確保新數(shù)據(jù)一定能夠正常寫入 // 類似循環(huán)數(shù)組的效果 for i := 0; i < span; i++ { rw.win.resetBucket((offset + i + 1) % rw.size) } // 更新offset rw.offset = (offset + span) % rw.size now := timex.Now() // 更新操作時間 // 這里很有意思 rw.lastTime = now - (now-rw.lastTime)%rw.interval}
window 統(tǒng)計數(shù)據(jù):
// 歸納匯總數(shù)據(jù)func (rw *RollingWindow) Reduce(fn func(b *Bucket)) { rw.lock.RLock() defer rw.lock.RUnlock() var diff int span := rw.span() // 當前時間截止前,未過期桶的數(shù)量 if span == 0 && rw.ignoreCurrent { diff = rw.size - 1 } else { diff = rw.size - span } if diff > 0 { // rw.offset - rw.offset+span之間的桶數(shù)據(jù)是過期的不應(yīng)該計入統(tǒng)計 offset := (rw.offset + span + 1) % rw.size // 匯總數(shù)據(jù) rw.win.reduce(offset, diff, fn) }}
// 按照最近一段時間的請求數(shù)據(jù)計算是否熔斷func (b *googleBreaker) accept() error { // 獲取最近一段時間的統(tǒng)計數(shù)據(jù) accepts, total := b.history() // 計算動態(tài)熔斷概率 weightedAccepts := b.k * float64(accepts) // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101 dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1)) // 概率為0,通過 if dropRatio <= 0 { return nil } // 隨機產(chǎn)生0.0-1.0之間的隨機數(shù)與上面計算出來的熔斷概率相比較 // 如果隨機數(shù)比熔斷概率小則進行熔斷 if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable } return nil}
熔斷器對外暴露兩種類型的方法
func (b *googleBreaker) allow() (internalPromise, error)
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
Acceptable 參數(shù)目的是自定義判斷請求是否成功。
Acceptable func(err error) bool
// 熔斷方法// 返回一個promise異步回調(diào)對象,可由開發(fā)者自行決定是否上報結(jié)果到熔斷器func (b *googleBreaker) allow() (internalPromise, error) { if err := b.accept(); err != nil { return nil, err } return googlePromise{ b: b, }, nil}// 熔斷方法// req - 熔斷對象方法// fallback - 自定義快速失敗函數(shù),可對熔斷產(chǎn)生的err進行包裝后返回// acceptable - 對本次未熔斷時執(zhí)行請求的結(jié)果進行自定義的判定,比如可以針對http.code,rpc.code,body.codefunc (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error { // 判定是否熔斷 if err := b.accept(); err != nil { // 熔斷中,如果有自定義的fallback則執(zhí)行 if fallback != nil { return fallback(err) } return err } // 如果執(zhí)行req()過程發(fā)生了panic,依然判定本次執(zhí)行失敗上報至熔斷器 defer func() { if e := recover(); e != nil { b.markFailure() panic(e) } }() // 執(zhí)行請求 err := req() // 判定請求成功 if acceptable(err) { b.markSuccess() } else { b.markFailure() } return err}// 上報成功func (b *googleBreaker) markSuccess() { b.stat.Add(1)}// 上報失敗func (b *googleBreaker) markFailure() { b.stat.Add(0)}// 統(tǒng)計數(shù)據(jù)func (b *googleBreaker) history() (accepts, total int64) { b.stat.Reduce(func(b *collection.Bucket) { accepts += int64(b.Sum) total += b.Count }) return}
https://github.com/zeromicro/go-zero
歡迎使用 go-zero
并 star 支持我們!
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/123760.html
摘要:這時候超過定時器設(shè)定的時間就會再次發(fā)送丟包的數(shù)據(jù)直到對端響應(yīng),所以需要每次都備份發(fā)送的數(shù)據(jù)。 UDP 面向報文 UDP 是一個面向報文(報文可以理解為一段段的數(shù)據(jù))的協(xié)議。意思就是 UDP 只是報文的搬運工,不會對報文進行任何拆分和拼接操作。 具體來說 在發(fā)送端,應(yīng)用層將數(shù)據(jù)傳遞給傳輸層的 UDP 協(xié)議,UDP 只會給數(shù)據(jù)增加一個 UDP 頭標識下是 UDP 協(xié)議,然后就傳遞給網(wǎng)絡(luò)層...
摘要:這時候超過定時器設(shè)定的時間就會再次發(fā)送丟包的數(shù)據(jù)直到對端響應(yīng),所以需要每次都備份發(fā)送的數(shù)據(jù)。 UDP 面向報文 UDP 是一個面向報文(報文可以理解為一段段的數(shù)據(jù))的協(xié)議。意思就是 UDP 只是報文的搬運工,不會對報文進行任何拆分和拼接操作。 具體來說 在發(fā)送端,應(yīng)用層將數(shù)據(jù)傳遞給傳輸層的 UDP 協(xié)議,UDP 只會給數(shù)據(jù)增加一個 UDP 頭標識下是 UDP 協(xié)議,然后就傳遞給網(wǎng)絡(luò)層...
摘要:第三天,太監(jiān)傳話欽天監(jiān)求見一日無事。第四天,欽天監(jiān)一日無事。然后所有的競爭線程放棄自旋,逐個插入到對象里的一個隊列尾部,進入阻塞狀態(tài)。 微信公眾號:IT一刻鐘大型現(xiàn)實非嚴肅主義現(xiàn)場一刻鐘與你分享優(yōu)質(zhì)技術(shù)架構(gòu)與見聞,做一個有劇情的程序員關(guān)注可第一時間了解更多精彩內(nèi)容,定期有福利相送喲。 showImg(https://segmentfault.com/img/bVbrgsJ?w=900...
摘要:實現(xiàn)熔斷降級注解除了可以用來做限流控制之外,還能實現(xiàn)與類似的熔斷降級策略。函數(shù)簽名要求返回值類型必須與原函數(shù)返回值類型一致方法參數(shù)列表需要為空,或者可以額外多一個類型的參數(shù)用于接收對應(yīng)的異常。若未配置和,則被限流降級時會將直接拋出。 在之前的《使用Sentinel實現(xiàn)接口限流》一文中,我們僅依靠引入Spring Cloud Alibaba對Sentinel的整合封裝spring-clo...
閱讀 2443·2021-11-18 10:07
閱讀 2390·2021-09-22 15:59
閱讀 3149·2021-08-23 09:42
閱讀 2361·2019-08-30 15:44
閱讀 1251·2019-08-29 15:06
閱讀 2420·2019-08-29 13:27
閱讀 1289·2019-08-29 13:21
閱讀 1510·2019-08-29 13:13