亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

spark Dstreams-程序部署

IT那活兒 / 2369人閱讀
spark Dstreams-程序部署

點擊上方“IT那活兒”,關(guān)注后了解更多內(nèi)容,不管IT什么活兒,干就完了?。?!



01


部署條件



為了運行一個spark streaming應(yīng)用程序,需要滿足以下條件 :

1.1 使用集群管理器管理集群:

這是基本的要求。

1.2 打成jar包:

你必須將你的應(yīng)用程序編譯成jar包,使用spark-submit啟動程序,然而如果你的程序使用的是高級數(shù)據(jù)源(例如kafka),你必須將kafka依賴打進jar包。

1.3 為執(zhí)行節(jié)點配置足夠的內(nèi)存:

因為接收到的數(shù)據(jù)必須保存在內(nèi)存,所以執(zhí)行節(jié)點必須有足夠的內(nèi)存來存儲數(shù)據(jù),如果要執(zhí)行10分鐘的窗口操作,系統(tǒng)必須在內(nèi)存中保留至少10分鐘的數(shù)據(jù),因此應(yīng)用程序的內(nèi)存需求取決于其中使用的操作。

1.4 配置檢查點:

如果流應(yīng)用程序需要,則必須將Hadoop API兼容容錯存儲(例如HDFS、S3等)中的目錄配置為檢查點目錄,并且流應(yīng)用程序的寫入方式應(yīng)確保檢查點信息可用于故障恢復(fù)。

1.5 配置應(yīng)用驅(qū)動程序的的自動重啟:

為了從驅(qū)動程序故障中自動修復(fù),用于運行流應(yīng)用程序的部署基礎(chǔ)結(jié)構(gòu)必須監(jiān)視驅(qū)動程序進程,并在驅(qū)動程序失敗時重新啟動驅(qū)動程序。不同的集群管理器有不同的工具來實現(xiàn)這一點。

  • spark standalone:可以提交spark程序以以spark standalone方式運行,也就是說應(yīng)用程序在一個節(jié)點運行,而且可以指示standalone集群管理器監(jiān)督驅(qū)動程序,如果驅(qū)動程序由于非零退出代碼或運行驅(qū)動程序的節(jié)點故障而失敗,則重新啟動它。

  • YARN:YARN支持自動重啟應(yīng)用程序的類似機制。

  •  Mesos:Marathon已經(jīng)被用來實現(xiàn)這一目標。

1.6 配置預(yù)寫日志(write-ahead logs):

自spark1.2,我們已經(jīng)引入了預(yù)寫日志以實現(xiàn)強大的容錯保證,如果啟用它,所有從receiver接收到的數(shù)據(jù)都會寫入配置檢查點目錄中的預(yù)寫日志。

這可以防止驅(qū)動程序恢復(fù)時的數(shù)據(jù)丟失,從而確保零數(shù)據(jù)丟失,可以通過設(shè)置spark.streaming.receiver.writeAheadLog.enable=true來啟用它,然而這可能以單個接收器的接收吞吐為代價,但是這可以通過并行運行更多接收器來彌補。

此外,建議在啟用預(yù)寫日志時禁用spark內(nèi)接收數(shù)據(jù)的復(fù)制,因為該日志已存儲在已復(fù)制的存儲系統(tǒng)中,這可以通過設(shè)置存儲級別為StorageLevel.MEMORY_AND_DISK_SER來實現(xiàn),使用S3(或任何不支持刷新的文件系統(tǒng))進行預(yù)寫日志時,請記住啟用:

spark.streaming.driver.writeAheadLog.closeFileAfterWrite

spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。

1.7 設(shè)置最大接收速率:

如果集群資源不夠大,spark streaming應(yīng)用程序無法以接收數(shù)據(jù)的速度處理數(shù)據(jù),則可以通過設(shè)置記錄的最大速率限制來限制接收器的速率,請參閱:

  • 接收器的配置參數(shù)spark.streaming.receiver.maxRate

  • 直接kafka方法的配置參數(shù)spark.streaming.kafka.maxRatePerPartition

在Spark 1.5中,我們引入了一種稱為背壓的功能,它消除了設(shè)置此速率限制的需要,因為Spark Streaming會自動計算速率限制,并在處理條件發(fā)生變化時動態(tài)調(diào)整速率限制。可通過將配置參數(shù)spark.streaming.backpressure.enabled設(shè)置為true來啟用此背壓。


02


升級應(yīng)用程序代碼


如果你需要升級spark streaming應(yīng)用程序代碼,有兩種可能的機制。
2.1 升級后的Spark Streaming應(yīng)用程序?qū)?,并與現(xiàn)有應(yīng)用程序并行運行。一旦新的(接收到與舊的相同的數(shù)據(jù))被預(yù)熱并準備好進入黃金時段,舊的就可以被取下。
2.2 現(xiàn)有應(yīng)用程序正常關(guān)閉(有關(guān)正常關(guān)閉選項,請參閱StreamingContext.stop(…)或JavaStreamingContext.stop(…),以確保在關(guān)閉之前完全處理已接收的數(shù)據(jù)。然后可以啟動升級后的應(yīng)用程序,該應(yīng)用程序?qū)脑缙趹?yīng)用程序停止的同一點開始處理。
請注意,這只能通過支持源端緩沖(如Kafka)的輸入源來實現(xiàn),因為在上一個應(yīng)用程序關(guān)閉且升級的應(yīng)用程序尚未啟動時,需要緩沖數(shù)據(jù)。無法從升級前代碼的早期檢查點信息重新啟動。
檢查點信息實質(zhì)上包含序列化的Scala/Java/Python對象,嘗試使用新的、修改過的類反序列化對象可能會導(dǎo)致錯誤。
在這種情況下,使用不同的檢查點目錄啟動升級的應(yīng)用程序,或者刪除以前的檢查點目錄。


03


監(jiān)控應(yīng)用程序


除了Spark的監(jiān)控功能外,還有Spark streaming特有的其他功能。
使用StreamingContext時,Spark web UI會顯示一個附加的流選項卡,其中顯示有關(guān)正在運行的接收器(接收器是否處于活動狀態(tài)、接收到的記錄數(shù)、接收器錯誤等)和已完成批次(批處理時間、排隊延遲等)的統(tǒng)計信息。這可用于監(jiān)視流應(yīng)用程序的進度。
Web UI中的兩個指標特別重要:
  • processing time:處理每個批次花費的時間
  • Scheduling Delay:批在隊列里等待前一批處理完成的時間
如果批次處理時間始終大于批次間隔和/或排隊延遲持續(xù)增加,則表明系統(tǒng)無法以生成批次的速度處理批次,并且正在落后。在這種情況下,考慮減少批處理時間。
還可以使用StreamingListener接口監(jiān)控Spark streaming程序的進度,該接口允許您獲取接收器狀態(tài)和處理時間。請注意,這是一個開發(fā)人員API,將來可能會對其進行改進(即報告更多信息)。


04


性能調(diào)整


要從集群上的Spark流媒體應(yīng)用程序中獲得最佳性能,需要進行一些調(diào)整。
本節(jié)介紹了一些可以調(diào)整以提高應(yīng)用程序性能的參數(shù)和配置。在高層次上,你需要考慮兩件事:
  • 通過高效地使用群集資源,減少每批數(shù)據(jù)的處理時間。
  • 設(shè)置正確的批大小,以便可以在接收數(shù)據(jù)時盡快處理數(shù)據(jù)批(即,數(shù)據(jù)處理與數(shù)據(jù)攝取保持同步)。
減少批處理時間可以在Spark中進行許多優(yōu)化,以最大限度地縮短每個批次的處理時間。
下面重點介紹了一些最重要的問題。
4.1 數(shù)據(jù)接收中的并行級別
通過網(wǎng)絡(luò)接收數(shù)據(jù)(如kafka,socket等)需要將數(shù)據(jù)反序列化并存儲到spark中,如果數(shù)據(jù)接收成為系統(tǒng)中的瓶頸,那么考慮數(shù)據(jù)接收的并行化。
請注意,每個輸入數(shù)據(jù)流都會創(chuàng)建一個接收單個數(shù)據(jù)流的接收器(在工作機器上運行)。因此,通過創(chuàng)建多個輸入數(shù)據(jù)流并將其配置為從源接收數(shù)據(jù)流的不同分區(qū),可以實現(xiàn)接收多個數(shù)據(jù)流。
例如,接收兩個主題數(shù)據(jù)的單個kafka輸入數(shù)據(jù)流可以分成兩個kafka輸入流,每個kafka輸入流只接收一個主題。這將運行兩個接收器,允許并行接收數(shù)據(jù),從而提高了總體吞吐量。
這些多個數(shù)據(jù)流可以聯(lián)合在一起以創(chuàng)建單個數(shù)據(jù)流。然后,應(yīng)用于單個輸入數(shù)據(jù)流的轉(zhuǎn)換可以應(yīng)用于統(tǒng)一流??梢赃@樣做:
應(yīng)考慮的另一個參數(shù)是接收器的塊間隔,它由配置參數(shù)spark.streaming.blockInterval確定。對于大多數(shù)接收器,接收到的數(shù)據(jù)在存儲到Spark的內(nèi)存中之前會合并成數(shù)據(jù)塊。
每個批處理中的塊數(shù)決定了在類似映射的轉(zhuǎn)換中用于處理接收數(shù)據(jù)的任務(wù)數(shù)。每批每個接收器的任務(wù)數(shù)大約為(批間隔/塊間隔)。
例如,200 ms的塊間隔將每2秒批創(chuàng)建10個任務(wù)。如果任務(wù)數(shù)量太少(即,少于每臺機器的核心數(shù)量),那么它將是低效的,因為所有可用的核心都不會用于處理數(shù)據(jù)。要增加給定批處理間隔的任務(wù)數(shù),請減少塊間隔。但是,建議的最小塊間隔值約為50 ms,低于該值,任務(wù)啟動開銷可能會出現(xiàn)問題。
使用多個輸入流/接收器接收數(shù)據(jù)的另一種方法是顯式地重新劃分輸入數(shù)據(jù)流(使用inputStream.repartition())。這將在進一步處理之前在群集中指定數(shù)量的計算機上分發(fā)接收到的數(shù)據(jù)批。
4.2 數(shù)據(jù)處理中的并行級別
如果在計算的任何階段中使用的并行任務(wù)的數(shù)量不夠多,那么集群資源可能會利用不足。例如,對于reduceByKey和ReduceByAndWindow等分布式reduce操作,并行任務(wù)的默認數(shù)量由spark.default.parallelism配置屬性控制。
您可以將并行級別作為參數(shù)傳遞(請參閱PairDStreamFunctions文檔),或者設(shè)置spark.default.parallelism配置屬性以更改默認值。
4.3 數(shù)據(jù)序列化
通過調(diào)整序列化格式,可以減少數(shù)據(jù)序列化的開銷。在流式傳輸?shù)那闆r下,有兩種類型的數(shù)據(jù)可以被序列化。
  • 輸入數(shù)據(jù)(input data):
    默認情況下,通過接收器接收的輸入數(shù)據(jù)存儲在具有StorageLevel.memory_DISK_SER_2的執(zhí)行器內(nèi)存中。也就是說,數(shù)據(jù)序列化為字節(jié)以減少GC開銷,并復(fù)制以容忍執(zhí)行器故障。
    此外,數(shù)據(jù)首先保存在內(nèi)存中,只有當(dāng)內(nèi)存不足以保存流計算所需的所有輸入數(shù)據(jù)時,才會溢出到磁盤。這種序列化顯然有開銷——接收器必須對接收到的數(shù)據(jù)進行反序列化,并使用Spark的序列化格式對其重新序列化。
  • 流操作生成的持久化RDD:
    流式計算生成的RDD可以保存在內(nèi)存中。例如,窗口操作將數(shù)據(jù)保存在內(nèi)存中,因為它們將被多次處理。但是,與Spark Core默認的StorageLevel.MEMORY_ONLY不同,流式計算生成的持久化RDD默認使用StorageLevel.MEMORY_ONLY_DISK(即序列化)持久化,以最小化GC開銷。
在這兩種情況下,使用Kryo序列化可以減少CPU和內(nèi)存開銷。
在流應(yīng)用程序需要保留的數(shù)據(jù)量不大的特定情況下,可以將數(shù)據(jù)(兩種類型)作為反序列化對象持久化,而不會產(chǎn)生過多的GC開銷。
例如,如果使用幾秒鐘的批處理間隔且沒有窗口操作,則可以通過顯式設(shè)置相應(yīng)的存儲級別來嘗試禁用持久化數(shù)據(jù)中的序列化。這將減少由于序列化而產(chǎn)生的CPU開銷,從而有可能在沒有太多GC開銷的情況下提高性能。
4.4 任務(wù)啟動開銷
如果每秒啟動的任務(wù)數(shù)很高(例如,每秒50個或更多),則向執(zhí)行者發(fā)送任務(wù)的開銷可能很大,并且將很難實現(xiàn)亞秒延遲。
通過以下更改可以減少開銷:
執(zhí)行模式:在standalone模式或粗粒度Mesos模式下運行Spark會導(dǎo)致比細粒度Mesos模式更好的任務(wù)啟動時間。
這些更改可能會將批處理時間減少100毫秒,從而允許使用次秒級的批處理大小。
4.5 設(shè)置正確的批處理間隔
為了使在集群上運行的Spark Streaming應(yīng)用程序保持穩(wěn)定,系統(tǒng)應(yīng)該能夠以接收數(shù)據(jù)的速度處理數(shù)據(jù)。
換句話說,批處理速度應(yīng)該與接收數(shù)據(jù)速度一樣快,可以通過web UI監(jiān)控批數(shù)據(jù)處理時間,其應(yīng)該小于批處理間隔。
根據(jù)流計算的性質(zhì),所使用的批處理間隔可能會對應(yīng)用程序在一組固定的群集資源上可以維持的數(shù)據(jù)速率產(chǎn)生重大影響。
例如,讓我們考慮較早的WorddCurnNead示例。對于特定的數(shù)據(jù)速率,系統(tǒng)可能能夠每2秒(即2秒的批處理間隔)跟蹤報告字數(shù),但不是每500毫秒一次。因此,需要設(shè)置批次間隔,以便能夠維持生產(chǎn)中的預(yù)期數(shù)據(jù)速率。
為應(yīng)用程序確定正確的批處理大小的一個好方法是使用保守的批處理間隔(例如,5-10秒)和低數(shù)據(jù)速率對其進行測試。
為了驗證系統(tǒng)是否能夠跟上數(shù)據(jù)速率,您可以檢查每個處理批次所經(jīng)歷的端到端延遲值(在Spark driver log4j日志中查找“總延遲”,或使用StreamingListener接口)。
如果延遲保持與批量大小相當(dāng),則系統(tǒng)是穩(wěn)定的。否則,如果延遲持續(xù)增加,則意味著系統(tǒng)無法跟上,因此不穩(wěn)定。一旦你有了一個穩(wěn)定配置的想法,你可以嘗試增加數(shù)據(jù)速率和/或減少批量大小。
請注意,只要延遲降低回較低值(即,小于批量大小),由于臨時數(shù)據(jù)速率增加而導(dǎo)致的延遲瞬時增加就可以了。
4.6 內(nèi)存調(diào)整
我們將專門討論Spark流應(yīng)用程序上下文中的一些調(diào)優(yōu)參數(shù)。
Spark流應(yīng)用程序所需的集群內(nèi)存量在很大程度上取決于所使用的轉(zhuǎn)換類型。例如,如果要對最后10分鐘的數(shù)據(jù)使用窗口操作,那么集群應(yīng)該有足夠的內(nèi)存保留10分鐘的數(shù)據(jù)。
或者,如果您想使用帶有大量鍵的updateStateByKey,那么所需的內(nèi)存將很高。相反,如果要執(zhí)行簡單的映射過濾器存儲操作,則所需內(nèi)存將較低。
通常,由于通過接收器接收的數(shù)據(jù)存儲在StorageLevel.MEMORY_AND_DISK_SER_2中,因此超過內(nèi)存的數(shù)據(jù)將溢出到磁盤。這可能會降低streaming應(yīng)用程序的性能。
因此建議根據(jù)streaming應(yīng)用程序的要求提供足夠的內(nèi)存。最好嘗試在小范圍內(nèi)查看內(nèi)存使用情況,并進行相應(yīng)的估計。
內(nèi)存調(diào)優(yōu)的另一個方面是垃圾收集。對于需要低延遲的流應(yīng)用程序,不希望JVM垃圾收集導(dǎo)致暫停。
有幾個參數(shù)可以幫助您調(diào)整內(nèi)存使用和GC開銷:
  • 數(shù)據(jù)流的存儲級別:
    imput data數(shù)據(jù)和RDD在默認情況下作為序列化字節(jié)持久化。與反序列化持久化相比,這減少了內(nèi)存使用和GC開銷。啟用Kryo序列化進一步減少了序列化大小和內(nèi)存使用。
  • 清除舊數(shù)據(jù):
    默認情況下,由數(shù)據(jù)流轉(zhuǎn)換生成的所有輸入數(shù)據(jù)和持久化RDD將自動清除。Spark Streaming根據(jù)所使用的轉(zhuǎn)換決定何時清除數(shù)據(jù)。例如,如果使用10分鐘的窗口操作,則Spark Streaming將保留最后10分鐘的數(shù)據(jù),并主動丟棄較舊的數(shù)據(jù)。通過設(shè)置streamingContext.remember,數(shù)據(jù)可以保留更長的時間。
  • CMS垃圾收集器:
    強烈建議使用并發(fā)標記和掃描GC,以保持GC相關(guān)暫停始終較低。盡管已知并發(fā)GC會降低系統(tǒng)的總體處理吞吐量,但仍建議使用它來實現(xiàn)更一致的批處理時間。確保在驅(qū)動程序(使用spark submit中的--driver java選項)和執(zhí)行器(使用spark配置spark.executor.extraJavaOptions)上設(shè)置CMS GC。


05


容  錯


接下來討論在spark streaming應(yīng)用程序中發(fā)生故障時行為。
5.1 RDD容錯語義
為了理解spark streaming的容錯語義,讓我們先看下RDD的基本容錯語義。
  • RDD是一個不可變的、確定的和可重新計算的分布式數(shù)據(jù)集,每個RDD都會記住創(chuàng)建數(shù)據(jù)集的依賴。
  • 如果RDD的任何分區(qū)由于工作節(jié)點故障而丟失,則可以使用操作依賴關(guān)系從原始容錯數(shù)據(jù)集重新計算該分區(qū)。
  • 假設(shè)所有的RDD轉(zhuǎn)換都是確定性的,那么不管Spark集群中是否出現(xiàn)故障,最終轉(zhuǎn)換的RDD中的數(shù)據(jù)都將始終相同。
Spark對HDFS或S3等容錯文件系統(tǒng)中的數(shù)據(jù)進行操作。
因此,從容錯數(shù)據(jù)生成的所有RDD也是容錯的。但是,Spark streaming的情況并非如此,因為在大多數(shù)情況下,數(shù)據(jù)是通過網(wǎng)絡(luò)接收的(使用fileStream時除外)。
為了為所有生成的RDD實現(xiàn)相同的容錯屬性,在群集中工作節(jié)點的多個Spark執(zhí)行器之間復(fù)制接收到的數(shù)據(jù)(默認復(fù)制系數(shù)為2)。
這導(dǎo)致系統(tǒng)中出現(xiàn)兩種數(shù)據(jù),在發(fā)生故障時需要恢復(fù):
  • 接收和復(fù)制的數(shù)據(jù)—此數(shù)據(jù)在單個工作節(jié)點發(fā)生故障時仍然有效,因為它的副本存在于另一個節(jié)點上。
  • 已接收但已緩沖用于復(fù)制的數(shù)據(jù)—由于未復(fù)制此數(shù)據(jù),因此恢復(fù)此數(shù)據(jù)的唯一方法是再次從源獲取它。
還有兩種故障需要注意:
  • 運行executors的任何工作節(jié)點都可能失敗,并且這些節(jié)點上的所有內(nèi)存中數(shù)據(jù)都將丟失。如果任何接收器在發(fā)生故障的節(jié)點上運行,則其緩沖數(shù)據(jù)將丟失。
  • 如果運行Spark Streaming應(yīng)用程序的驅(qū)動程序節(jié)點出現(xiàn)故障,則SparkContext顯然會丟失,所有executor及其內(nèi)存中的數(shù)據(jù)也會丟失。
5.2 Spark streaming容錯語義
5.2.1 定義
在所有可能的操作條件下(盡管出現(xiàn)故障等),系統(tǒng)可以提供三種類型的保證:
  • 最多一次:每條記錄要么處理一次,要么根本不處理。
  • 至少一次:每條記錄將被處理一次或多次。這比最多一次強,因為它確保不會丟失任何數(shù)據(jù)。但也可能有重復(fù)。
  • 只有一次:每條記錄將精確處理一次-不會丟失任何數(shù)據(jù),也不會多次處理任何數(shù)據(jù)。這顯然是三者中最有力的保證。
5.2.2 基本語義
在任何流處理系統(tǒng)中,廣義地說,處理數(shù)據(jù)有三個步驟。
  • 接收數(shù)據(jù):使用接收器或其他方式從源接收數(shù)據(jù)。
  • 轉(zhuǎn)換數(shù)據(jù):使用數(shù)據(jù)流和RDD轉(zhuǎn)換處理接收的數(shù)據(jù)。
  • 輸出數(shù)據(jù):最終轉(zhuǎn)換的數(shù)據(jù)輸出到外部系統(tǒng),如文件系統(tǒng)、數(shù)據(jù)庫、儀表板等。
如果流應(yīng)用程序必須實現(xiàn)端到端的精確一次保證,那么每個步驟都必須提供精確一次的保證。也就是說,每個記錄必須準確接收一次,準確轉(zhuǎn)換一次,并準確推送到下游系統(tǒng)一次。讓我們在Spark流的上下文中理解這些步驟的語義。
  • 接收數(shù)據(jù):不同的輸入源提供不同的保證,文章后面將詳細討論。
  • 轉(zhuǎn)換數(shù)據(jù):由于RDD提供的保證,所有接收到的數(shù)據(jù)都將被精確地處理一次。即使出現(xiàn)故障,只要接收到的輸入數(shù)據(jù)是可訪問的,最終轉(zhuǎn)換的RDD將始終具有相同的內(nèi)容。
  • 推出數(shù)據(jù):默認情況下,輸出操作至少確保一次語義,因為它取決于輸出操作的類型(冪等式或非冪等式)和下游系統(tǒng)的語義(是否支持事務(wù))。但用戶可以實現(xiàn)自己的事務(wù)機制,以實現(xiàn)精確的一次性語義。文章后面將詳細討論這一點。
5.2.3 接收數(shù)據(jù)語義
不同的輸入源提供不同的保證,從至少一次到恰好一次不等。
File source
如果所有輸入數(shù)據(jù)都已存在于HDFS等容錯文件系統(tǒng)中,Spark Streaming始終可以從任何故障中恢復(fù)并處理所有數(shù)據(jù)。這給出了精確的一次語義,這意味著所有數(shù)據(jù)將被精確地處理一次,而不管什么失敗。
基于接收器的source
對于基于接收器的輸入源,容錯語義取決于故障場景和接收器類型。如前所述,有兩種類型的接收器:
  • 可靠接收器-這些接收器僅在確保已復(fù)制接收到的數(shù)據(jù)后才確認可靠來源。如果這樣的接收器出現(xiàn)故障,源將不會收到緩沖(未復(fù)制)數(shù)據(jù)的確認。
    因此,如果接收器重新啟動,源將重新發(fā)送數(shù)據(jù),并且不會因故障而丟失任何數(shù)據(jù)。
  • 不可靠的接收器-此類接收器不發(fā)送確認,因此在由于工作人員或驅(qū)動程序故障而失敗時可能會丟失數(shù)據(jù)。
根據(jù)所使用的接收器類型,我們實現(xiàn)了以下語義:
如果工作節(jié)點發(fā)生故障,則可靠的接收器不會丟失數(shù)據(jù)。對于不可靠的接收器,接收到但未復(fù)制的數(shù)據(jù)可能會丟失。
如果驅(qū)動程序節(jié)點發(fā)生故障,那么除了這些丟失之外,在內(nèi)存中接收和復(fù)制的所有過去的數(shù)據(jù)都將丟失。這將影響有狀態(tài)轉(zhuǎn)換的結(jié)果。
為了避免丟失過去接收到的數(shù)據(jù),Spark 1.2引入了預(yù)寫日志,將接收到的數(shù)據(jù)保存到容錯存儲器中。
通過啟用預(yù)寫日志和可靠的接收器,可以實現(xiàn)零數(shù)據(jù)丟失。在語義方面,它提供了至少一次的保證。
下表總結(jié)了故障下的語義:
在Spark 1.3中,我們引入了一個新的Kafka Direct API,它可以確保Spark Streaming只接收一次所有Kafka數(shù)據(jù)。此外,如果實現(xiàn)一次輸出操作,則可以實現(xiàn)端到端的一次輸出保證。
5.2.4 輸出操作語義
輸出操作(比如foreachRDD)至少有一次語義,也就是說,如果工作程序發(fā)生故障,轉(zhuǎn)換后的數(shù)據(jù)可能會多次寫入外部實體。
雖然這對于使用saveAs***Files操作保存到文件系統(tǒng)是可以接受的(因為文件將被相同的數(shù)據(jù)覆蓋),但是可能需要額外的努力來實現(xiàn)一次語義。
有兩種方法:
  • 冪等更新:多次嘗試總是寫入相同的數(shù)據(jù)。例如,saveAs***文件總是將相同的數(shù)據(jù)寫入生成的文件。
  • 事務(wù)性更新:所有更新都是以事務(wù)方式進行的,因此更新只以原子方式進行一次。實現(xiàn)這一點的一種方法是:
使用批處理時間(在foreachRDD中可用)和RDD的分區(qū)索引來創(chuàng)建標識符。此標識符唯一標識流應(yīng)用程序中的數(shù)據(jù)。
使用標識符以事務(wù)方式(即原子方式)更新外部系統(tǒng)。也就是說,如果標識符尚未提交,則以原子方式提交分區(qū)數(shù)據(jù)和標識符。否則,如果已提交,則跳過更新。


06


注  意


6.1 一個數(shù)據(jù)流與一個接收器相關(guān)聯(lián)。
為了實現(xiàn)讀取并行性,需要創(chuàng)建多個接收器,即多個數(shù)據(jù)流。接收器在執(zhí)行器中運行。它占據(jù)一個核心。確保在預(yù)訂接收器插槽后有足夠的內(nèi)核進行處理,即spark.cores.max應(yīng)考慮接收器插槽。接收器以循環(huán)方式分配給執(zhí)行者。
6.2 當(dāng)從流源接收數(shù)據(jù)時,接收器創(chuàng)建數(shù)據(jù)塊。
每個塊間隔生成一個新的數(shù)據(jù)塊。在N個塊間隔內(nèi)創(chuàng)建N個數(shù)據(jù)塊。這些塊由當(dāng)前executor的塊管理器分配給其他executor的塊管理器。之后,驅(qū)動程序上運行的網(wǎng)絡(luò)輸入跟蹤器將被告知塊位置,以便進一步處理。
6.3 在驅(qū)動程序(driver)上為批處理間隔期間創(chuàng)建的塊創(chuàng)建RDD。
批處理間隔期間生成的塊是分區(qū)RDD。每個分區(qū)都是spark中的一項任務(wù)。塊間隔(blockInterval)==批處理間隔(batchinterval)意味著創(chuàng)建了單個分區(qū),并且可能在本地對其進行處理。
6.4 塊上的map任務(wù)在具有塊的executor(一個接收塊,另一個復(fù)制塊)中處理,而不考慮塊間隔,除非非本地調(diào)度開始。
擁有更大的區(qū)塊間隔意味著更大的區(qū)塊。較高的spark.locality.wait值會增加在本地節(jié)點上處理塊的機會。需要在這兩個參數(shù)之間找到平衡,以確保在本地處理較大的塊。
6.5 您可以通過調(diào)用inputDstream.repartition(n)來定義分區(qū)的數(shù)量,而不是依賴于batchInterval和blockInterval。
這將隨機重新排列RDD中的數(shù)據(jù),以創(chuàng)建n個分區(qū)。是的,為了更大的并行性。雖然是以洗牌為代價的。RDD的處理由驅(qū)動程序的jobscheduler作為作業(yè)進行調(diào)度。在給定的時間點,只有一個作業(yè)處于活動狀態(tài)。因此,如果一個作業(yè)正在執(zhí)行,其他作業(yè)將排隊。
6.6 如果有兩個數(shù)據(jù)流,將形成兩個RDD,并將創(chuàng)建兩個作業(yè),這兩個作業(yè)將一個接一個地安排。
為了避免這種情況,可以合并兩個數(shù)據(jù)流。這將確保為數(shù)據(jù)流的兩個RDD形成一個unionRDD。然后將此unionRDD視為單個作業(yè)。但是,RDD的分區(qū)不受影響。
6.7 如果批處理時間超過batchinterval,那么很明顯,接收器的內(nèi)存將開始填滿,并最終引發(fā)異常(很可能是BlockNotFoundException)。
目前,無法暫停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。



本文作者:潘宗昊

本文來源:IT那活兒(上海新炬王翦團隊)

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/129642.html

相關(guān)文章

  • SparkStreaming概述

    摘要:但在企業(yè)中存在很多實時性處理的需求,例如雙十一的京東阿里,通常會做一個實時的數(shù)據(jù)大屏,顯示實時訂單。這種情況下,對數(shù)據(jù)實時性要求較高,僅僅能夠容忍到延遲分鐘或幾秒鐘。1 Spark Streaming是什么它是一個可擴展,高吞吐具有容錯性的流式計算框架吞吐量:單位時間內(nèi)成功傳輸數(shù)據(jù)的數(shù)量之前我們接觸的spark-core和spark-sql都是處理屬于離線批處理任務(wù),數(shù)據(jù)一般都是在固定位置上...

    Tecode 評論0 收藏0
  • Spark Streaming學(xué)習(xí)筆記

    摘要:輸入和接收器輸入代表從某種流式數(shù)據(jù)源流入的數(shù)據(jù)流。文件數(shù)據(jù)流可以從任何兼容包括等的文件系統(tǒng),創(chuàng)建方式如下將監(jiān)視該目錄,并處理該目錄下任何新建的文件目前還不支持嵌套目錄。會被一個個依次推入隊列,而則會依次以數(shù)據(jù)流形式處理這些的數(shù)據(jù)。 特點: Spark Streaming能夠?qū)崿F(xiàn)對實時數(shù)據(jù)流的流式處理,并具有很好的可擴展性、高吞吐量和容錯性。 Spark Streaming支持從多種數(shù)...

    陸斌 評論0 收藏0
  • Spark 快速入門

    摘要:數(shù)據(jù)科學(xué)任務(wù)主要是數(shù)據(jù)分析領(lǐng)域,數(shù)據(jù)科學(xué)家要負責(zé)分析數(shù)據(jù)并建模,具備統(tǒng)計預(yù)測建模機器學(xué)習(xí)等方面的經(jīng)驗,以及一定的使用或語言進行編程的能力。監(jiān)控運行時性能指標信息。 Spark Spark 背景 什么是 Spark 官網(wǎng):http://spark.apache.org Spark是一種快速、通用、可擴展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開源,20...

    wangshijun 評論0 收藏0
  • IntelliJ IDEA Windows下Spark開發(fā)環(huán)境部署

    摘要:運行數(shù)據(jù)準備隨便準備一個文檔格式不限,上傳到上。解決因為原來是用的版本為相應(yīng)的依賴包官網(wǎng)已經(jīng)不再支持,所以更新的平臺的環(huán)境為,相應(yīng)的文檔很少,更改版本為。星期六星期一由為知筆記遷移到。 0x01 環(huán)境說明 博客地址:http://www.cnblogs.com/ning-wang/p/7359977.html 1.1 本地 OS: windows 10jdk: jdk1.8.0_121...

    DevWiki 評論0 收藏0
  • Spark入門階段一之掃盲筆記

    摘要:同時集成了機器學(xué)習(xí)類庫?;谟嬎憧蚣?,將的分布式計算應(yīng)用到機器學(xué)習(xí)領(lǐng)域。提供了一個簡單的聲明方法指定機器學(xué)習(xí)任務(wù),并且動態(tài)地選擇最優(yōu)的學(xué)習(xí)算法。宣稱其性能是的多倍。 介紹 spark是分布式并行數(shù)據(jù)處理框架 與mapreduce的區(qū)別: mapreduce通常將中間結(jié)果放在hdfs上,spark是基于內(nèi)存并行大數(shù)據(jù)框架,中間結(jié)果放在內(nèi)存,對于迭代數(shù)據(jù)spark效率更高,mapred...

    starsfun 評論0 收藏0

發(fā)表評論

0條評論

IT那活兒

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<