亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專(zhuān)欄INFORMATION COLUMN

spark Dstreams-kafka數(shù)據(jù)源

IT那活兒 / 559人閱讀
spark Dstreams-kafka數(shù)據(jù)源

點(diǎn)擊上方“IT那活兒”,關(guān)注后了解更多內(nèi)容,不管IT什么活兒,干就完了?。?!


01


簡(jiǎn)   介


Spark Streaming+Kafka集成在實(shí)際應(yīng)用中是非常常見(jiàn)的,其中kafka需要是0.10.0版本及以上。Kafka 0.10的Spark Streaming集成提供了簡(jiǎn)單的并行性、Kafka分區(qū)和Spark分區(qū)之間的1:1對(duì)應(yīng)關(guān)系以及對(duì)偏移量和元數(shù)據(jù)的訪問(wèn)。
但是,由于較新的集成使用了新的Kafka consumer API而不是簡(jiǎn)單的API,因此在使用上存在顯著差異。


02


案例及說(shuō)明


首先需要添加依賴(lài):
Stream中的每條記錄是一個(gè)ConsumerRecord實(shí)體。如果Spark batch持續(xù)時(shí)間大于默認(rèn)的Kafka心跳會(huì)話超時(shí)(30秒),請(qǐng)適當(dāng)增加heartbeat.interval.ms和session.timeout.ms。
對(duì)于大于5分鐘的批處理,這將需要在代理上更改group.max.session.timeout.ms。
新的Kafka消費(fèi)API將把消息預(yù)取到緩沖區(qū)中。因此,出于性能原因,Spark integration將緩存的使用者保留在執(zhí)行器上(而不是為每個(gè)批處理重新創(chuàng)建它們),并且更愿意在具有適當(dāng)使用者的主機(jī)位置上調(diào)度分區(qū),這一點(diǎn)很重要。
在大多數(shù)情況下,您應(yīng)該使用LocationStrategies.PreferConsistent,如上所示。這將在可用的執(zhí)行器之間均勻地分配分區(qū)。如果您的執(zhí)行者與您的Kafka代理位于相同的主機(jī)上,請(qǐng)使用PreferBrokers,這將更傾向于在Kafka leader上為該分區(qū)安排分區(qū)。
最后,如果分區(qū)之間的負(fù)載有明顯的偏差,請(qǐng)使用PreferFixed。這允許您指定分區(qū)到主機(jī)的顯式映射(任何未指定的分區(qū)都將使用一致的位置)。
消費(fèi)者的緩存的默認(rèn)最大大小為64.如果您希望處理超過(guò)(64 *個(gè)執(zhí)行程序數(shù))Kafka分區(qū),則可以通過(guò)spark.streaming.kafka.consumer.cache.maxCapacity更改此設(shè)置。
如果要禁用Kafka使用者的緩存,可以將spark.streaming.Kafka.consumer.cache.enabled設(shè)置為false。
緩存由topicpartition和group.id設(shè)置密鑰,因此對(duì)createDirectStream的每次調(diào)用使用多帶帶的group.id。
新的Kafka consumer API有許多不同的方法來(lái)指定主題,其中一些方法需要大量的對(duì)象實(shí)例化后設(shè)置。ConsumerStrategies提供了一個(gè)抽象,允許Spark即使在從檢查點(diǎn)重新啟動(dòng)后也能獲得正確配置的使用者。
如上所示,Subscribe允許您訂閱固定的主題集合。SubscribePattern允許您使用正則表達(dá)式指定感興趣的主題。請(qǐng)注意,與0.8集成不同,使用Subscribe或SubscribePattern應(yīng)該響應(yīng)在運(yùn)行流期間添加分區(qū)。最后,Assign允許您指定一個(gè)固定的分區(qū)集合這三種策略都有重載構(gòu)造函數(shù),允許您指定特定分區(qū)的起始偏移量。
如果你有一個(gè)更適合批處理的用例,那么可以創(chuàng)建RDD來(lái)定義偏移范圍:
獲取偏移量:
請(qǐng)注意,只有在createDirectStream結(jié)果上調(diào)用的第一個(gè)方法中,而不是在隨后的方法鏈中,才能成功地將類(lèi)型轉(zhuǎn)換為HasOffsetRanges。
請(qǐng)注意,RDD分區(qū)和Kafka分區(qū)之間的一對(duì)一映射在任何洗牌或重新分區(qū)的方法(例如reduceByKey()或window())之后都不會(huì)保留。


03


偏移量管理


kafka在失敗情況下傳輸語(yǔ)義取決于偏移量的存儲(chǔ)方式和存儲(chǔ)時(shí)間,spark輸出操作至少一次,如果你想只有一次輸出,則必須在冪等輸出后存儲(chǔ)偏移量,或者在原子事務(wù)中與輸出一起存儲(chǔ)偏移量,通過(guò)這種集成,為了提高可靠性,您有三個(gè)選項(xiàng)來(lái)存儲(chǔ)偏移量。
1) Checkpoint
如果啟用checkpointing,偏移量將存儲(chǔ)在檢查點(diǎn)中。這很容易實(shí)現(xiàn),但也有缺點(diǎn)。您的輸出操作必須是冪等的,因?yàn)槟鷮⒌玫街貜?fù)的輸出;交易不是一種選擇。此外,如果應(yīng)用程序代碼已更改,則無(wú)法從檢查點(diǎn)恢復(fù)。對(duì)于計(jì)劃的升級(jí),您可以通過(guò)在舊代碼的同時(shí)運(yùn)行新代碼來(lái)緩解這一問(wèn)題(因?yàn)檩敵鰺o(wú)論如何都需要是冪等的,所以它們不應(yīng)該沖突)。但對(duì)于需要更改代碼的計(jì)劃外故障,您將丟失數(shù)據(jù),除非您有另一種方法來(lái)識(shí)別已知良好的起始偏移量。
2)kafka自己管理偏移量
Kafka有一個(gè)特殊的topic用來(lái)存儲(chǔ)偏移量,默認(rèn)情況下,消費(fèi)者會(huì)自動(dòng)定期提交偏移量,但是這肯定不是你想要的,因?yàn)檩喸?xún)期間消息可能還未輸出,這就是上面的流示例將“enable.auto.commit”設(shè)置為false的原因,但是在知道輸出已存儲(chǔ)后,可以手動(dòng)將偏移提交到Kafka,與檢查點(diǎn)相比,Kafka的好處在于無(wú)論應(yīng)用程序代碼如何更改,它都是一個(gè)持久的存儲(chǔ)。然而,卡夫卡不是事務(wù)性的,所以您的輸出仍然必須是冪等的。
3)自定義存儲(chǔ)
對(duì)于支持事務(wù)的數(shù)據(jù)存儲(chǔ),將偏移保存在與結(jié)果相同的事務(wù)中可以使兩者保持同步,即使在失敗的情況下也是如此。
如果在檢測(cè)重復(fù)或跳過(guò)的偏移量范圍時(shí)非常小心,則回滾事務(wù)可防止重復(fù)或丟失的消息影響結(jié)果。這給出了精確一次語(yǔ)義的等價(jià)物,甚至對(duì)于聚合產(chǎn)生的輸出也可以使用這種策略,聚合通常很難使其成為冪等的。



本文作者:潘宗昊

本文來(lái)源:IT那活兒(上海新炬王翦團(tuán)隊(duì))

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/129613.html

相關(guān)文章

  • Spark 快速入門(mén)

    摘要:數(shù)據(jù)科學(xué)任務(wù)主要是數(shù)據(jù)分析領(lǐng)域,數(shù)據(jù)科學(xué)家要負(fù)責(zé)分析數(shù)據(jù)并建模,具備統(tǒng)計(jì)預(yù)測(cè)建模機(jī)器學(xué)習(xí)等方面的經(jīng)驗(yàn),以及一定的使用或語(yǔ)言進(jìn)行編程的能力。監(jiān)控運(yùn)行時(shí)性能指標(biāo)信息。 Spark Spark 背景 什么是 Spark 官網(wǎng):http://spark.apache.org Spark是一種快速、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開(kāi)源,20...

    wangshijun 評(píng)論0 收藏0
  • Spark SQL知識(shí)點(diǎn)與實(shí)戰(zhàn)

    摘要:是最新的查詢(xún)起始點(diǎn),實(shí)質(zhì)上是和的組合,所以在和上可用的在上同樣是可以使用的。轉(zhuǎn)換為轉(zhuǎn)換為其實(shí)就是對(duì)的封裝,所以可以直接獲取內(nèi)部的注意此時(shí)得到的存儲(chǔ)類(lèi)型為是具有強(qiáng)類(lèi)型的數(shù)據(jù)集合,需要提供對(duì)應(yīng)的類(lèi)型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于結(jié)構(gòu)化數(shù)據(jù)(structured data)處理的Spark模塊。與基本的Spark RDD API不同,Sp...

    番茄西紅柿 評(píng)論0 收藏2637

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<