Flink具備高吞吐、低延遲、純流式架構(gòu)、支持對(duì)亂序事件的處理、有狀態(tài)、高度靈活的窗口定制、失敗恢復(fù)、故障轉(zhuǎn)移、水平擴(kuò)展、批處理、流處理統(tǒng)一的API等大數(shù)據(jù)處理優(yōu)勢(shì)?;诖髷?shù)據(jù)的應(yīng)用場(chǎng)景中,從數(shù)據(jù)生產(chǎn),到數(shù)據(jù)收集、數(shù)據(jù)處理、數(shù)據(jù)應(yīng)用,貫穿整個(gè)大數(shù)據(jù)生命周期全棧的每個(gè)環(huán)節(jié),F(xiàn)link 均可應(yīng)用其中。作為新一代開(kāi)源大數(shù)據(jù)計(jì)算引擎,F(xiàn)link 不僅滿足海量數(shù)據(jù)對(duì)實(shí)時(shí)性的需求,且能夠全鏈路打通端到端的數(shù)據(jù)價(jià)值挖掘。
基于開(kāi)源組件的架構(gòu)如果能實(shí)現(xiàn)性能最優(yōu)化,那就是高潮迭起,掌聲不斷,如果架構(gòu)性能不能最優(yōu)化,那就是爺爺趕牛車,急死孫子。
筆者所在項(xiàng)目的日志綜管平臺(tái)架構(gòu)使用了Flink組件,遇到了實(shí)時(shí)計(jì)算延遲的性能問(wèn)題,下面筆者將和團(tuán)隊(duì)一起解決該性能問(wèn)題的過(guò)程分享如下。
運(yùn)行模式 | Flink on yarn |
Fink | 1.7.2 |
Hadoop | 2.8.5 |
計(jì)算節(jié)點(diǎn)數(shù) | 9臺(tái)虛擬機(jī)(單臺(tái)cpu:12核內(nèi)存:64GB 硬盤:550G) |
kafka | 2.1 |
業(yè)務(wù)高峰期處理數(shù)據(jù)量(每分鐘) | 860w |
生成指標(biāo) | 30個(gè) |
跑的任務(wù)數(shù) | 8個(gè) |
如圖所示,flink處理的業(yè)務(wù)鏈數(shù)據(jù)量從某一時(shí)間點(diǎn)突然出現(xiàn)斷崖式下降,數(shù)據(jù)處理積壓越來(lái)嚴(yán)重,flink 任務(wù)背壓較高,同時(shí)指標(biāo)出現(xiàn)延時(shí)生成現(xiàn)象(正常處理延時(shí)1分以內(nèi))。
首先通過(guò)查看flink業(yè)務(wù)鏈處理日志,發(fā)現(xiàn)疑似線索。日志顯示任務(wù)連接上游kafka報(bào)Disconnection連接異常現(xiàn)象。當(dāng)指標(biāo)延時(shí)時(shí),此錯(cuò)誤信息報(bào)頻率較高,但指標(biāo)不延時(shí)偶爾也會(huì)報(bào)錯(cuò),是否這就是導(dǎo)致問(wèn)題的罪魁禍?zhǔn)祝扛鶕?jù)這一線索,繼續(xù)刨根問(wèn)底:
分析及措施:
上游kafka采用kerberos認(rèn)證機(jī)制,每隔24小時(shí)需要重新認(rèn)證(調(diào)用專有客戶端進(jìn)行認(rèn)證),flink 9臺(tái)計(jì)算節(jié)點(diǎn)上部署自動(dòng)認(rèn)證腳本,每隔10分鐘程序自動(dòng)認(rèn)證,Disconnection連接異?,F(xiàn)象出現(xiàn)頻率減少,但指標(biāo)延時(shí)情況還在存在。
調(diào)整flink 連接kafka消費(fèi)topic參數(shù)
default.api.timeout.ms
session.timeout.ms
request.timeout.ms
fetch.max.wait.ms
fetch.min.bytes
調(diào)整連接參數(shù)后Disconnection連接異常現(xiàn)象未出現(xiàn),但指標(biāo)延時(shí)現(xiàn)象依然存在。
通過(guò)監(jiān)測(cè)上游kafka topic 消費(fèi)分組Lag值,發(fā)現(xiàn)是下游消費(fèi)滯后造成數(shù)據(jù)積壓現(xiàn)象。
分析結(jié)論:通過(guò)以上監(jiān)測(cè)與優(yōu)化措施,指標(biāo)生成延遲問(wèn)題仍未解決,排除由Kafka引起指標(biāo)延時(shí)的可能性。
通過(guò)上述優(yōu)化整改,F(xiàn)link與kafka連接異常問(wèn)題解決,但延遲的問(wèn)題還是存在,革命尚未成功,吾輩仍需繼續(xù)深入分析。經(jīng)比對(duì)多天日志,發(fā)現(xiàn)每次任務(wù)重啟前都有checkpoint失敗,ClosedByInterruptException異?,F(xiàn)象
分析及措施:
因?yàn)闃I(yè)務(wù)鏈業(yè)務(wù)量巨大(高峰期每分鐘需處理的數(shù)據(jù)量達(dá)800萬(wàn)左右),在有限flink計(jì)算節(jié)點(diǎn)上(9臺(tái)虛擬機(jī)),按照要求需要滿足幾十個(gè)指標(biāo)在1分鐘內(nèi)不出現(xiàn)延時(shí)生成。當(dāng)任務(wù)重啟后如果從歷史檢查點(diǎn)恢復(fù)處理消費(fèi)數(shù)據(jù),數(shù)據(jù)量積壓概率較高,無(wú)法保障指標(biāo)生成不延時(shí)。所以,重啟處理機(jī)制更改為每次任務(wù)重啟后從當(dāng)前時(shí)間點(diǎn)消費(fèi)kafka 數(shù)據(jù),而非從檢查點(diǎn)開(kāi)始。
關(guān)閉checkpoint后,無(wú)對(duì)應(yīng)異常日志,但指標(biāo)生成延遲問(wèn)題依然存在。
分析結(jié)論:雖然對(duì)該可疑目標(biāo)進(jìn)行了tunning,但延遲依舊存在,進(jìn)一步排除了checkpoint失敗導(dǎo)致指標(biāo)延時(shí)的可能性。
排除以上兩種情況后,繼續(xù)對(duì)flink組件本身的運(yùn)行狀態(tài)做全面綜合深入分析。
分析及措施:
加大并發(fā)數(shù)處理:業(yè)務(wù)鏈kafka topic 是100 partition,正常下游Flink需要開(kāi)100 個(gè)并發(fā)與partition個(gè)數(shù)一一對(duì)應(yīng)起來(lái),如此配置性能才能最優(yōu)。但當(dāng)前flink集群資源有限(flink集群機(jī)器上跑了其它96個(gè)并發(fā)任務(wù)),無(wú)法開(kāi)啟100 個(gè)并發(fā)(經(jīng)測(cè)試最大可開(kāi)啟72 個(gè)并發(fā))。按可用最大并發(fā)配置后,計(jì)算節(jié)點(diǎn)cpu 負(fù)載30%以下,但指標(biāo)仍出現(xiàn)延時(shí),看來(lái)擴(kuò)大并發(fā)數(shù)解決不了延時(shí)問(wèn)題。
線程運(yùn)行狀況
通過(guò)分析程序運(yùn)行狀態(tài),發(fā)現(xiàn)shsncSpanEventStream pre -> Timestamps/Watermarks 這段邏輯有線程鎖現(xiàn)象:
"AsyncIO-Emitter-Thread (trans stream -> Process -> shsncSpanEventStream pre -> Timestamps/Watermarks (28/72))" #181 daemon prio=5 os_prio=0 tid=0x00007f4da3cf8000 nid=0x1324c waiting for monitor entry [0x00007f4d8bdeb000]
java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:125)
- waiting to lock <0x0000000610a5b028> (a java.lang.Object)at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
at java.lang.Thread.run(Thread.java:748)
背壓運(yùn)行狀況
從任務(wù)背壓圖上看,處理延時(shí)是堵在入口解析及下游水位處理環(huán)節(jié)點(diǎn)邏輯上:
優(yōu)化措施:
flink共享waterMaker機(jī)制,在數(shù)據(jù)源進(jìn)行waterMaker注冊(cè),減少邏輯處理N倍;
對(duì)應(yīng)用吐過(guò)來(lái)業(yè)務(wù)數(shù)據(jù)SPAN和SPANEVENT進(jìn)行分流處理,提高程序處理速度;
增加過(guò)濾數(shù)據(jù)邏輯,過(guò)濾掉無(wú)需做指標(biāo)計(jì)算的數(shù)據(jù),減少程序數(shù)據(jù)處理量。
業(yè)務(wù)鏈flink任務(wù)入redis/es拆分出來(lái)做多帶帶算子進(jìn)行入庫(kù)。
任務(wù)并發(fā)數(shù),調(diào)整為50個(gè)并發(fā)數(shù),消費(fèi)kafka topic(topic 100 partition)
實(shí)施以上優(yōu)化措施后,問(wèn)題依舊,延時(shí)并沒(méi)有得到緩解和解決。由于在上一步為了排除checkpoint原因,關(guān)閉了checkpoint,關(guān)閉check后雖然沒(méi)有解決延時(shí)問(wèn)題,但是由于關(guān)閉了checkpoint程序不會(huì)因?yàn)閏heckpoint失敗而停止,因此得以觀察延時(shí)情況下程序gc和堆棧具體使用情況。
gc分析:指標(biāo)延遲時(shí),通過(guò)jstat 監(jiān)測(cè)發(fā)現(xiàn)flink 計(jì)算節(jié)點(diǎn)不斷做FGC,雖然FGC已經(jīng)達(dá)到每1秒一次(FGC時(shí)JVM會(huì)暫停,導(dǎo)致程序卡頓延時(shí)),但是老年代并沒(méi)有任何回收(一直是99.98),因此可以判斷出現(xiàn)了內(nèi)存泄漏,究竟是哪個(gè)位置出現(xiàn)了內(nèi)存泄漏呢?
jmap分析: 通過(guò)jmap查看堆使用排行,驚訝的發(fā)現(xiàn)排在第一是Atomiclong類,占堆內(nèi)存達(dá)到恐怖的2.7G,而我們的代碼并沒(méi)有顯示使用Atomiclong類,排第二的是[C,表示字符串,字符串在程序中用的很多,排第二屬正常,第三還是Atomiclong類,這個(gè)Atomiclong類究竟是哪個(gè)對(duì)象引用的呢?第四是genericonobjectpool,這個(gè)也不正常,程序中連接池對(duì)象竟然有372198個(gè),哪里用得了這么多,還有一個(gè)jedisFactory類,一個(gè)工廠類竟然也有37萬(wàn)個(gè)實(shí)例,也是有問(wèn)題的。
mat分析:
通過(guò)簡(jiǎn)單的jmap命令,發(fā)現(xiàn)很多不正常的類占據(jù)著堆內(nèi)存的前幾名,這些類究竟有什么關(guān)系,誰(shuí)才是罪魁禍?zhǔn)祝恐缓檬钩鑫覀兊慕K極MAT分析大法。
通過(guò)分析導(dǎo)出生成的dump, 整個(gè)dump文件有6.7G,使用32G內(nèi)存的機(jī)器經(jīng)過(guò)10多分鐘的處理,才得到以下分析結(jié)果。
分析報(bào)告顯示ScheduledThreadPoolExecutor對(duì)象持有4.3GB堆內(nèi)存未釋放,堆餅圖中占有97%
點(diǎn)進(jìn)去查看樹(shù)圖,發(fā)現(xiàn)ScheduledThreadPoolExecutor對(duì)象持有4.3GB堆內(nèi)存全部是GenericObjectPool對(duì)象(4.3G,接近1百萬(wàn)個(gè)對(duì)象)
再點(diǎn)擊GenericObjectPool展開(kāi)后發(fā)現(xiàn):
之前通過(guò)jmap分析排行在前的AtomicLong(排第一,占2.7G)和redisFactory類都是躲藏在GenericObjectPool對(duì)象中的。分析至此,本人的第六擼感告訴我,離事情的真相越來(lái)越近了。與redis連接相關(guān)的GenericObjectPool對(duì)象就是問(wèn)題的真兇,內(nèi)存泄漏的對(duì)象。
看到GenericObjectPool連接池對(duì)象不釋放,首先想到的是連redis的連接池去掉。將flink任務(wù)與redis交互的代碼去掉GenericObjectPool連接池,采用直接獲取redisCluseter對(duì)象方式:
(上圖是初始代碼,JedisCluter保存在GenericObjectPool連接池中)
(去掉GenericObjectPool連接池后只獲取JedisCluster對(duì)象)
結(jié)果:?jiǎn)栴}未緩解,未解決,還得繼續(xù)。
由于去掉和redis的連接池未解決問(wèn)題,依然生成大量GenericObjectPool對(duì)象不釋放,一個(gè)推測(cè)是并發(fā)原因?qū)е聠卫龥](méi)有生效,生成了很多個(gè)JedisCluster對(duì)象,而JedisCluster對(duì)象包含了連接池。嘗試synchronized加鎖:
結(jié)果:?jiǎn)栴}仍未緩解,仍未解決,還得繼續(xù)。
上兩步都沒(méi)有進(jìn)展,從頭開(kāi)始分析代碼,代碼分析過(guò)程中發(fā)現(xiàn)flink十多個(gè)任務(wù)都是使用統(tǒng)一的redis初始化方法且只生成單個(gè)redis連接實(shí)例。十多個(gè)flink任務(wù), 每個(gè)flink任務(wù)中又有許多地方需要用到redis連接,redis單例在這里就會(huì)成為瓶頸(數(shù)據(jù)處理不過(guò)來(lái),進(jìn)而積壓)。于是變單例的redisCluseter對(duì)象為多帶帶變量,每個(gè)用到redis連接的類都生成redisCluseter變量,然后再與redis交互,以此使redis隨Flink的連接數(shù)并發(fā)派生。
整改結(jié)果:?jiǎn)栴}得到階段性解決,之前運(yùn)行一天就出現(xiàn)堆和gc問(wèn)題,整改后穩(wěn)定運(yùn)行三天后又出現(xiàn)同樣問(wèn)題。
雖然只穩(wěn)定運(yùn)行三天,但對(duì)筆者和整個(gè)團(tuán)隊(duì)來(lái)說(shuō),也還是很開(kāi)心的,說(shuō)明我們的方向大概率是對(duì)的。但問(wèn)題復(fù)現(xiàn),作為四有好青年的IT民工,咱得發(fā)揚(yáng)不怕苦,不怕累的精神繼續(xù)分析排查。這次排查過(guò)程中發(fā)現(xiàn)眾多的GenericObjectPool是由ScheduledThreadPoolExector引用的,ScheduledThreadPoolExector是一個(gè)異步類,再排查flink中的異步方法。找到AsyncDataStream.unorderedWait()是異步寫入redis方法,將其修改為改造后的官方flink-redis連接包,去除異步。
結(jié)果:?jiǎn)栴}解決,堆和gc一直正常
業(yè)務(wù)鏈指標(biāo)生成正常:
指標(biāo)數(shù)據(jù)量正常:
未發(fā)現(xiàn)有線程鎖現(xiàn)象:
Gc 正常:
1.通過(guò)此次問(wèn)題一波三折的解決過(guò)程,筆者總結(jié)在排查分析處理相關(guān)開(kāi)源組件的性能問(wèn)題時(shí),要充分利用jdk自帶的stat/jmap/jstack等內(nèi)存分析工具及相關(guān)開(kāi)源性能監(jiān)測(cè)工具(如arthas)對(duì)進(jìn)程運(yùn)行狀態(tài)進(jìn)行深入分析,找出性能瓶頸,如死鎖,fgc頻繁等。
2.通過(guò)hadoop web管理界面,自帶背壓監(jiān)測(cè)圖及metrics監(jiān)測(cè)指標(biāo)圖可以查看任務(wù)運(yùn)行現(xiàn)狀。條件充許情況下,建議利用Prometheus工具對(duì)metrics進(jìn)行實(shí)時(shí)監(jiān)測(cè)。
3.結(jié)合日志,分階段分析任務(wù)邏輯存在的性能瓶頸,然后通過(guò)一系列的優(yōu)化措施(拆分/合并/過(guò)濾/異步)提高任務(wù)處理性能。
開(kāi)源組件架構(gòu)的最優(yōu)化使用是基于海量業(yè)務(wù)場(chǎng)景不斷迭代進(jìn)化而來(lái),這個(gè)過(guò)程對(duì)自己對(duì)團(tuán)隊(duì)都是一種歷練和精進(jìn)。在問(wèn)題得到最終解決,性能得到大幅提升,業(yè)務(wù)流暢運(yùn)行后,有種發(fā)自內(nèi)心的會(huì)當(dāng)凌絕頂,一覽眾山小的成就感。最后感謝那些通宵排查問(wèn)題的夜晚和我一起并肩作戰(zhàn)的兄弟們。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/130230.html
摘要:基于在阿里巴巴搭建的平臺(tái)于年正式上線,并從阿里巴巴的搜索和推薦這兩大場(chǎng)景開(kāi)始實(shí)現(xiàn)。在經(jīng)過(guò)一番調(diào)研之后,阿里巴巴實(shí)時(shí)計(jì)算認(rèn)為是一個(gè)非常適合的選擇。接下來(lái),我們聊聊阿里巴巴在層對(duì)又大刀闊斧地進(jìn)行了哪些改進(jìn)。 Apache Flink 概述 Apache Flink(以下簡(jiǎn)稱Flink)是誕生于歐洲的一個(gè)大數(shù)據(jù)研究項(xiàng)目,原名StratoSphere。該項(xiàng)目是柏林工業(yè)大學(xué)的一個(gè)研究性項(xiàng)目,早期...
摘要:另外,將機(jī)制發(fā)揚(yáng)光大,對(duì)有著非常好的支持。系統(tǒng)也注意到并討論了和的問(wèn)題。總結(jié)本文分享了四本相關(guān)的書籍和一份領(lǐng)域相關(guān)的論文列表篇,涉及的設(shè)計(jì),實(shí)現(xiàn),故障恢復(fù),彈性擴(kuò)展等各方面。 前言 之前也分享了不少自己的文章,但是對(duì)于 Flink 來(lái)說(shuō),還是有不少新入門的朋友,這里給大家分享點(diǎn) Flink 相關(guān)的資料(國(guó)外數(shù)據(jù) pdf 和流處理相關(guān)的 Paper),期望可以幫你更好的理解 Flink。...
摘要:第三個(gè)就是比較重點(diǎn)的內(nèi)容,在有贊的實(shí)踐。第四部分是將實(shí)時(shí)計(jì)算化,界面化的一些實(shí)踐。二有贊實(shí)時(shí)平臺(tái)架構(gòu)有贊的實(shí)時(shí)平臺(tái)架構(gòu)呢有幾個(gè)主要的組成部分。實(shí)時(shí)平臺(tái)提供了集群管理,項(xiàng)目管理,任務(wù)管理和報(bào)警監(jiān)控的功能。。 一、前言 這篇主要由五個(gè)部分來(lái)組成: 首先是有贊的實(shí)時(shí)平臺(tái)架構(gòu)。 其次是在調(diào)研階段我們?yōu)槭裁催x擇了 Flink。在這個(gè)部分,主要是 Flink 與 Spark 的 structure...
摘要:第三個(gè)就是比較重點(diǎn)的內(nèi)容,在有贊的實(shí)踐。第四部分是將實(shí)時(shí)計(jì)算化,界面化的一些實(shí)踐。二有贊實(shí)時(shí)平臺(tái)架構(gòu)有贊的實(shí)時(shí)平臺(tái)架構(gòu)呢有幾個(gè)主要的組成部分。實(shí)時(shí)平臺(tái)提供了集群管理,項(xiàng)目管理,任務(wù)管理和報(bào)警監(jiān)控的功能。。 一、前言 這篇主要由五個(gè)部分來(lái)組成: 首先是有贊的實(shí)時(shí)平臺(tái)架構(gòu)。 其次是在調(diào)研階段我們?yōu)槭裁催x擇了 Flink。在這個(gè)部分,主要是 Flink 與 Spark 的 structure...
閱讀 1459·2023-01-11 13:20
閱讀 1811·2023-01-11 13:20
閱讀 1262·2023-01-11 13:20
閱讀 2004·2023-01-11 13:20
閱讀 4225·2023-01-11 13:20
閱讀 2878·2023-01-11 13:20
閱讀 1485·2023-01-11 13:20
閱讀 3805·2023-01-11 13:20