摘要:遇到問題分析之后搞了個(gè)還沒仔細(xì)了解可參考的與的有區(qū)別及并發(fā)控制先看看的,與的這幾個(gè)概念。一個(gè)可以認(rèn)為就是會(huì)最終輸出一個(gè)結(jié)果的一條由組織而成的計(jì)算。在中,我們通過使用新極大地增強(qiáng)對(duì)狀態(tài)流處理的支持。
Spark Streaming遇到問題分析
1、Spark2.0之后搞了個(gè)Structured Streaming還沒仔細(xì)了解,可參考:https://github.com/lw-lin/Coo...
2、Spark的Job與Streaming的Job有區(qū)別及Streaming Job并發(fā)控制:先看看Spark Streaming 的 JobSet, Job,與 Spark Core 的 Job, Stage, TaskSet, Task 這幾個(gè)概念。
[Spark Streaming]
JobSet 的全限定名是:org.apache.spark.streaming.scheduler.JobSet
Job 的全限定名是:org.apache.spark.streaming.scheduler.Job
[Spark Core]
Job 沒有一個(gè)對(duì)應(yīng)的實(shí)體類,主要是通過 jobId:Int 來表示一個(gè)具體的 job
Stage 的全限定名是:org.apache.spark.scheduler.Stage
TaskSet 的全限定名是:org.apache.spark.scheduler.TaskSet
Task 的全限定名是:org.apache.spark.scheduler.Task
Spark Core 的 Job, Stage, Task 就是我們“日?!闭?wù)?Spark任務(wù)時(shí)所說的那些含義,而且在 Spark 的 WebUI 上有非常好的體現(xiàn),比如下圖就是 1 個(gè) Job 包含duo1 個(gè) Stage;3 個(gè) Stage 各包含 8, 2, 4 個(gè) Task。而 TaskSet 則是 Spark Core 的內(nèi)部代碼里用的類,是 Task 的集合,和 Stage 是同義的。
Spark Core中:一個(gè)RDD DAG Graph可以生成一個(gè)或多個(gè)Job。一個(gè)Job可以認(rèn)為就是會(huì)最終輸出一個(gè)結(jié)果RDD的一條由RDD組織而成的計(jì)算。Job在spark里應(yīng)用里是一個(gè)被調(diào)度的單位。
Streaming中:一個(gè)batch的數(shù)據(jù)對(duì)應(yīng)一個(gè)DStreamGraph,而一個(gè)DStreamGraph包含一或多個(gè)關(guān)于DStream的輸出操作,每一個(gè)輸出對(duì)應(yīng)于一個(gè)Job,一個(gè)DStreamGraph對(duì)應(yīng)一個(gè)JobSet,里面包含一個(gè)或多個(gè)Job。用下圖表示如下:
生產(chǎn)的JobSet會(huì)提交給JobScheduler去執(zhí)行,JobScheduler包含了一個(gè)線程池,通過spark.streaming.concurrentJobs參數(shù)來控制其大小,也就是可以并發(fā)執(zhí)行的job數(shù),默認(rèn)是1.不過這個(gè)參數(shù)的設(shè)置以集群中executor機(jī)器的cpu core為準(zhǔn),比如集群中有2臺(tái)4核executor,那么spark.streaming.concurrentJobs可以設(shè)置為2x4=8. 同時(shí)你還可以控制調(diào)度策略:spark.scheduler.mode (FIFO/FAIR) 默認(rèn)是FIFO
我們的問題就是,運(yùn)行一段時(shí)間之后發(fā)現(xiàn)處理速度跟不上了,后來才發(fā)現(xiàn)原來這個(gè)參數(shù)默認(rèn)是1,而我們代碼中對(duì)于每個(gè)batch有兩個(gè)輸出操作,這樣會(huì)產(chǎn)生兩個(gè)job,而同一時(shí)間只能執(zhí)行一個(gè)job,慢慢地處理速度就跟不上生產(chǎn)速度了。所以實(shí)際中,請(qǐng)根據(jù)具體情況調(diào)整該參數(shù)。
此處參考:
https://github.com/lw-lin/Coo...
http://blog.csdn.net/xueba207...
http://www.jianshu.com/p/ab38...
3、 Spark Streaming緩存數(shù)據(jù)清理:調(diào)用cache有兩種情況,一種是調(diào)用DStream.cache,第二種是RDD.cache。事實(shí)上他們是完全一樣的。
DStream的cache 動(dòng)作只是將DStream的變量storageLevel 設(shè)置為MEMORY_ONLY_SER,然后在產(chǎn)生(或者獲取)RDD的時(shí)候,調(diào)用RDD的persit方法進(jìn)行設(shè)置。所以DStream.cache 產(chǎn)生的效果等價(jià)于RDD.cache(也就是你自己調(diào)用foreachRDD 將RDD 都設(shè)置一遍)
注意,當(dāng)你調(diào)用dstream.cache緩存數(shù)據(jù)的時(shí)候,Streaming在該batch處理完畢后,默認(rèn)會(huì)立即清除這個(gè)緩存,通過參數(shù)spark.streaming.unpersist 你是可以決定是否手工控制是否需要對(duì)cache住的數(shù)據(jù)進(jìn)行清理.
參考:
http://www.jianshu.com/p/f068...
https://github.com/apache/spa...
有兩種方式:updateStateByKey和mapWithState(Spark 1.6以后新增的)。
推薦使用mapWithState, 實(shí)際使用中,我發(fā)現(xiàn)updateStateByKey會(huì)慢慢拖慢處理速度,問題描述與該情況類似:http://comments.gmane.org/gma...
許多復(fù)雜流處理流水線程序必須將狀態(tài)保持一段時(shí)間,例如,如果你想實(shí)時(shí)了解網(wǎng)站用戶行為,你需要將網(wǎng)站上各“用戶會(huì)話(user session)”信息保存為持久狀態(tài)并根據(jù)用戶的行為對(duì)這一狀態(tài)進(jìn)行持續(xù)更新。這種有狀態(tài)的流計(jì)算可以在Spark Streaming中使用updateStateByKey 方法實(shí)現(xiàn)。
在Spark 1.6 中,我們通過使用新API mapWithState極大地增強(qiáng)對(duì)狀態(tài)流處理的支持。該新的API提供了通用模式的內(nèi)置支持,而在以前使用updateStateByKey 方法來實(shí)現(xiàn)這一相同功能(如會(huì)話超時(shí))需要進(jìn)行手動(dòng)編碼和優(yōu)化。因此,mapWithState 方法較之于updateStateByKey方法,有十倍之多的性能提升。
使用mapWithState方法進(jìn)行狀態(tài)流處理
盡管現(xiàn)有DStream中updateStateByKey方法能夠允許用戶執(zhí)行狀態(tài)計(jì)算,但使用mapWithState方法能夠讓用戶更容易地表達(dá)程序邏輯,同時(shí)讓性能提升10倍之多。讓我們通過一個(gè)例子對(duì)mapWithState方法的優(yōu)勢(shì)進(jìn)行闡述。
假設(shè)我們要根據(jù)用戶歷史動(dòng)作對(duì)某一網(wǎng)站的用戶行為進(jìn)行實(shí)時(shí)分析,對(duì)各個(gè)用戶,我們需要保持用戶動(dòng)作的歷史信息,然后根據(jù)這些歷史信息得到用戶的行為模型并輸出到下游的數(shù)據(jù)存儲(chǔ)當(dāng)中。
在Spark Streaming中構(gòu)建此應(yīng)用程序時(shí),我們首先需要獲取用戶動(dòng)作流作為輸入(例如通過Kafka或Kinesis),然后使用mapWithState 方法對(duì)輸入進(jìn)行轉(zhuǎn)換操作以生成用戶模型流,最后將處理后的數(shù)據(jù)流保存到數(shù)據(jù)存儲(chǔ)當(dāng)中。
mapWithState方法可以通過下面的抽象方式進(jìn)行理解,假設(shè)它是將用戶動(dòng)作和當(dāng)前用戶會(huì)話作為輸入的一個(gè)算子(operator),基于某個(gè)輸入動(dòng)作,該算子能夠有選擇地更新用戶會(huì)話,然后輸出更新后的用戶模型作為下游操作的輸入。開發(fā)人員在定義mapWithState方法時(shí)可以指定該更新函數(shù)。
首先我們定義狀態(tài)數(shù)據(jù)結(jié)構(gòu)及狀態(tài)更新函數(shù):
def stateUpdateFunction( userId: UserId, newData: UserAction, stateData: State[UserSession]): UserModel = { val currentSession = stateData.get()// 獲取當(dāng)前會(huì)話數(shù)據(jù) val updatedSession = ... // 使用newData計(jì)算更新后的會(huì)話 stateData.update(updatedSession) // 更新會(huì)話數(shù)據(jù) val userModel = ... // 使用updatedSession計(jì)算模型 return userModel // 將模型發(fā)送給下游操作 } // 用去動(dòng)作構(gòu)成的Stream,用戶ID作為key val userActions = ... // key-value元組(UserId, UserAction)構(gòu)成的stream // 待提交的數(shù)據(jù)流 val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction)) //-------------------------------------------------------------------------------------- //java的例子 Function3, State , Void> mappingFunction = new Function3 , State , Void>() { @Override public Void call(String key,Optional value, State state) { // Use state.exists(), state.get(), state.update() and state.remove() // to manage state, and return the necessary string LiveInfo info=value.orNull(); if(info!=null){ state.update(info.getChannel()+":::"+info.getTime()); } return null; } }; //處理計(jì)數(shù) samples //先將ip作為key .mapPartitionsToPair((v)->{ List > list=new ArrayList<>(); while(v.hasNext()){ Tuple2 tmpv = v.next(); String channelName=tmpv._1(); String ip=tmpv._2().getIp(); list.add(new Tuple2 (ip,tmpv._2())); } return list.iterator(); }) //更新狀態(tài) .mapWithState( StateSpec.function(mappingFunction) //4小時(shí)沒有更新則剔除 .timeout(Durations.minutes(4*60)) ) //獲得狀態(tài)快照流 .stateSnapshots() //后續(xù)操作
mapWithState的新特性和性能改進(jìn)
原生支持會(huì)話超時(shí)
許多基于會(huì)話的應(yīng)用程序要求具備超時(shí)機(jī)制,當(dāng)某個(gè)會(huì)話在一定的時(shí)間內(nèi)(如用戶沒有顯式地注銷而結(jié)束會(huì)話)沒有接收到新數(shù)據(jù)時(shí)就應(yīng)該將其關(guān)閉,與使用updateStateByKey方法時(shí)需要手動(dòng)進(jìn)行編碼實(shí)現(xiàn)所不同的是,開發(fā)人員可以通過mapWithState方法直接指定其超時(shí)時(shí)間。
userActions.mapWithState(StateSpec.function(stateUpdateFunction).timeout(Minutes(10)))
除超時(shí)機(jī)制外,開發(fā)人員也可以設(shè)置程序啟動(dòng)時(shí)的分區(qū)模式和初始狀態(tài)信息。
任意數(shù)據(jù)都能夠發(fā)送到下游
與updateStateByKey方法不同,任意數(shù)據(jù)都可以通過狀態(tài)更新函數(shù)將數(shù)據(jù)發(fā)送到下游操作,這一點(diǎn)已經(jīng)在前面的例子中有說明(例如通過用戶會(huì)話狀態(tài)返回用戶模型),此外,最新狀態(tài)的快照也能夠被訪問。
val userSessionSnapshots = userActions.mapWithState(statSpec).snapshotStream()
變量userSessionSnapshots 為一個(gè)DStream,其中各個(gè)RDD為各批(batch)數(shù)據(jù)處理后狀態(tài)更新會(huì)話的快照,該DStream與updateStateByKey方法返回的DStream是等同的。
更高的性能
最后,與updateStateByKey方法相比,使用mapWithState方法能夠得到6倍的低延遲同時(shí)維護(hù)的key狀態(tài)數(shù)量要多10倍。
此部分參考:
http://blog.csdn.net/lively19...
http://blog.csdn.net/zengxiao...
https://databricks.gitbooks.i...
5、如何理解時(shí)間窗口?時(shí)間窗口中數(shù)據(jù)是否會(huì)存在重復(fù)?
上圖里 batch duration = 1, window length = 3, sliding interval = 2
任何情況下 Job Submit 是以 batch duration 為準(zhǔn), 對(duì)于 window 操作,每隔 sliding interval 才去實(shí)際生成 RDD(每隔batch都會(huì)生成一個(gè)RDD,只是到windowDStream的時(shí)候做了合并,生成UnionRDD或者PartitionerAwareUnionRDD,最后輸出一個(gè)RDD),每次計(jì)算的結(jié)果包括 window length 個(gè) batch 的數(shù)據(jù)。
是否會(huì)存在重復(fù)?看下面兩張圖:
答案是:取決于你怎么設(shè)置窗口的兩個(gè)參數(shù)
(窗口長(zhǎng)度)window length – 窗口覆蓋的時(shí)間長(zhǎng)度
(滑動(dòng)距離)sliding interval – 窗口啟動(dòng)的時(shí)間間隔
更深入請(qǐng)參考:
https://github.com/lw-lin/Coo...
http://concord.io/posts/windo...
http://www.cnblogs.com/haozhe...
6、WAL(Write Ahead Log,預(yù)寫日志)與容錯(cuò)機(jī)制WAL是在 1.2 版本中就添加的特性。作用就是,將數(shù)據(jù)通過日志的方式寫到可靠的存儲(chǔ),比如 HDFS、s3,在 driver 或 worker failure 時(shí)可以從在可靠存儲(chǔ)上的日志文件恢復(fù)數(shù)據(jù)。WAL 在 driver 端和 executor 端都有應(yīng)用。
WAL使用在文件系統(tǒng)和數(shù)據(jù)庫用于數(shù)據(jù)操作的持久性,先把數(shù)據(jù)寫到一個(gè)持久化的日志中,然后對(duì)數(shù)據(jù)做操作,如果操作過程中系統(tǒng)掛了,恢復(fù)的時(shí)候可以重新讀取日志文件再次進(jìn)行操作。
對(duì)于像kafka和flume這些使用接收器來接收數(shù)據(jù)的數(shù)據(jù)源。接收器作為一個(gè)長(zhǎng)時(shí)間的任務(wù)運(yùn)行在executor中,負(fù)責(zé)從數(shù)據(jù)源接收數(shù)據(jù),如果數(shù)據(jù)源支持的話,向數(shù)據(jù)源確認(rèn)接收到數(shù)據(jù),然后把數(shù)據(jù)存儲(chǔ)在executor的內(nèi)存中,然后driver在exector上運(yùn)行任務(wù)處理這些數(shù)據(jù)。如果wal啟用了,所有接收到的數(shù)據(jù)會(huì)保存到一個(gè)日志文件中去(HDFS), 這樣保存接收數(shù)據(jù)的持久性,此外,只有在數(shù)據(jù)寫入到log中之后接收器才向數(shù)據(jù)源確認(rèn),這樣drive重啟后那些保存在內(nèi)存中但是沒有寫入到log中的數(shù)據(jù)將會(huì)重新發(fā)送,這兩點(diǎn)保證的數(shù)據(jù)的無丟失。
啟用WAL:
給streamingContext設(shè)置checkpoint的目錄,該目錄必須是HADOOP支持的文件系統(tǒng),用來保存WAL和做Streaming的checkpoint
spark.streaming.receiver.writeAheadLog.enable 設(shè)置為true
正常流程圖:
解析
1:藍(lán)色的箭頭表示接收的數(shù)據(jù),接收器把數(shù)據(jù)流打包成塊,存儲(chǔ)在executor的內(nèi)存中,如果開啟了WAL,將會(huì)把數(shù)據(jù)寫入到存在容錯(cuò)文件系統(tǒng)的日志文件中
2:青色的箭頭表示提醒driver, 接收到的數(shù)據(jù)塊的元信息發(fā)送給driver中的StreamingContext, 這些元數(shù)據(jù)包括:executor內(nèi)存中數(shù)據(jù)塊的引用ID和日志文件中數(shù)據(jù)塊的偏移信息
3:紅色箭頭表示處理數(shù)據(jù),每一個(gè)批處理間隔,StreamingContext使用塊信息用來生成RDD和jobs. SparkContext執(zhí)行這些job用于處理executor內(nèi)存中的數(shù)據(jù)塊
4:黃色箭頭表示checkpoint這些計(jì)算,以便于恢復(fù)。流式處理會(huì)周期的被checkpoint到文件中
當(dāng)一個(gè)失敗的driver重啟以后,恢復(fù)流程如下:
1:黃色的箭頭用于恢復(fù)計(jì)算,checkpointed的信息是用于重啟driver,重新構(gòu)造上下文和重啟所有的receiver
2: 青色箭頭恢復(fù)塊元數(shù)據(jù)信息,所有的塊信息對(duì)已恢復(fù)計(jì)算很重要
3:重新生成未完成的job(紅色箭頭),會(huì)使用到2恢復(fù)的元數(shù)據(jù)信息
4:讀取保存在日志中的塊(藍(lán)色箭頭),當(dāng)job重新執(zhí)行的時(shí)候,塊數(shù)據(jù)將會(huì)直接從日志中讀取,
5:重發(fā)沒有確認(rèn)的數(shù)據(jù)(紫色的箭頭)。緩沖的數(shù)據(jù)沒有寫到WAL中去將會(huì)被重新發(fā)送。
1、WAL在 driver 端的應(yīng)用
用于寫日志的對(duì)象 writeAheadLogOption: WriteAheadLog在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 構(gòu)造函數(shù)中被創(chuàng)建,ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是,這里只需要啟用 checkpoint 就可以創(chuàng)建該 driver 端的 WAL 管理實(shí)例,將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true。
首選需要明確的是,ReceivedBlockTracker 通過 WAL 寫入 log 文件的內(nèi)容是3種事件(當(dāng)然,會(huì)進(jìn)行序列化):
BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo);即新增了一個(gè) block 及該 block 的具體信息,包括 streamId、blockId、數(shù)據(jù)條數(shù)等
BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks);即為某個(gè) batchTime 分配了哪些 blocks 作為該 batch RDD 的數(shù)據(jù)源
BatchCleanupEvent(times: Seq[Time]);即清理了哪些 batchTime 對(duì)應(yīng)的 blocks
2、WAL 在 executor 端的應(yīng)用
Receiver 接收到的數(shù)據(jù)會(huì)源源不斷的傳遞給 ReceiverSupervisor,是否啟用 WAL 機(jī)制(即是否將 spark.streaming.receiver.writeAheadLog.enable 設(shè)置為 true)會(huì)影響 ReceiverSupervisor 在存儲(chǔ) block 時(shí)的行為:
不啟用 WAL:你設(shè)置的StorageLevel是什么,就怎么存儲(chǔ)。比如MEMORY_ONLY只會(huì)在內(nèi)存中存一份,MEMORY_AND_DISK會(huì)在內(nèi)存和磁盤上各存一份等
啟用 WAL:在StorageLevel指定的存儲(chǔ)的基礎(chǔ)上,寫一份到 WAL 中。存儲(chǔ)一份在 WAL 上,更不容易丟數(shù)據(jù)但性能損失也比較大
3、WAL 使用建議
關(guān)于是否要啟用 WAL,要視具體的業(yè)務(wù)而定:
若可以接受一定的數(shù)據(jù)丟失,則不需要啟用 WAL,WAL開啟了以后會(huì)減少Spark Streaming處理數(shù)據(jù)的吞吐,因?yàn)樗薪邮盏臄?shù)據(jù)會(huì)被寫到到容錯(cuò)的文件系統(tǒng)上,這樣文件系統(tǒng)的吞吐和網(wǎng)絡(luò)帶寬將成為瓶頸。
若完全不能接受數(shù)據(jù)丟失,那就需要同時(shí)啟用 checkpoint 和 WAL,checkpoint 保存著執(zhí)行進(jìn)度(比如已生成但未完成的 jobs),WAL 中保存著 blocks 及 blocks 元數(shù)據(jù)(比如保存著未完成的 jobs 對(duì)應(yīng)的 blocks 信息及 block 文件)。同時(shí),這種情況可能要在數(shù)據(jù)源和 Streaming Application 中聯(lián)合來保證 exactly once 語義
此處參考:
http://www.jianshu.com/p/5e09...
http://www.cnblogs.com/gaoxin...
1、ls操作
列出子目錄和文件(不包括嵌套層級(jí)):listStatus(path,filter)
列出所有(包括嵌套層級(jí)):listFiles(path,true)
Path file = new Path(HDFSConst.live_path); FileStatus[] statuslist = hdfs.listStatus(file, (v) -> { return v.getName().contains(prefix) && !v.getName().endsWith(HDFSConst.processSubffix); }); Listpaths = new ArrayList (); for (FileStatus status : statuslist) { Path tmp = new Path(status.getPath().toString()); RemoteIterator statusIter = hdfs.listFiles(tmp, true); boolean shouldAdd = true; while (statusIter.hasNext()) { LocatedFileStatus status2 = statusIter.next(); if (status2.getPath().toString().contains("/_temporary/")) { shouldAdd = false; break; } } if (shouldAdd) { paths.add(tmp); } }
2、hadoop No FileSystem for scheme:
問題來源:
This is a typical case of the maven-assembly plugin breaking things.
Different JARs (hadoop-commons for LocalFileSystem, hadoop-hdfs for DistributedFileSystem) each contain a different file called org.apache.hadoop.fs.FileSystem in their META-INFO/services directory. This file lists the canonical classnames of the filesystem implementations they want to declare (This is called a Service Provider Interface implemented via java.util.ServiceLoader, see org.apache.hadoop.FileSystem line 2622).
When we use maven-assembly-plugin, it merges all our JARs into one, and all META-INFO/services/org.apache.hadoop.fs.FileSystem overwrite each-other. Only one of these files remains (the last one that was added). In this case, the FileSystem list from hadoop-commons overwrites the list from hadoop-hdfs, so DistributedFileSystem was no longer declared.
How we fixed it
After loading the Hadoop configuration, but just before doing anything FileSystem-related
hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() ); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() );
不要使用maven-assembly-plugin,使用maven shade插件:
org.apache.maven.plugins maven-shade-plugin 2.3 package shade
3、Wrong FS: hdfs… expected: file:///
Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://my-master:8020"), configuration); Path filePath = new Path(); FSDataInputStream fsDataInputStream = fs.open(filePath); BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream));
此處參考:
https://stackoverflow.com/que...
https://stackoverflow.com/que...
https://stackoverflow.com/que...
RDD DAG(有向無環(huán)圖,Directed Acyclic Graph):每一個(gè)操作生成一個(gè)rdd,rdd之間連一條邊,最后這些rdd和他們之間的邊組成一個(gè)有向無環(huán)圖,就是這個(gè)dag。不只是spark,現(xiàn)在很多計(jì)算引擎都是dag模型的.(有向指的是 RDD 之間的依賴關(guān)系,無環(huán)是因?yàn)?RDD 中數(shù)據(jù)是不可變的)
在Spark作業(yè)調(diào)度系統(tǒng)中,調(diào)度的前提是判斷多個(gè)作業(yè)任務(wù)的依賴關(guān)系,這些作業(yè)任務(wù)之間可能存在因果的依賴關(guān)系,也就是說有些任務(wù)必須先獲得執(zhí)行,然后相關(guān)的依賴任務(wù)才能執(zhí)行,但是任務(wù)之間顯然不應(yīng)出現(xiàn)任何直接或間接的循環(huán)依賴關(guān)系,所以本質(zhì)上這種關(guān)系適合用DAG表示。
DAGscheduler簡(jiǎn)單來說就是負(fù)責(zé)任務(wù)的邏輯調(diào)度,負(fù)責(zé)將作業(yè)拆分成不同階段的具有依賴關(guān)系的多批任務(wù)。DAGscheduler最重要的任務(wù)之一就是計(jì)算作業(yè)和任務(wù)的依賴關(guān)系,制定調(diào)度邏輯。
spark中rdd經(jīng)過若干次transform操作,由于transform操作是lazy的,因此,當(dāng)rdd進(jìn)行action操作時(shí),rdd間的轉(zhuǎn)換關(guān)系也會(huì)被提交上去,得到rdd內(nèi)部的依賴關(guān)系,進(jìn)而根據(jù)依賴,劃分出不同的stage。
DAG是有向無環(huán)圖,一般用來描述任務(wù)之間的先后關(guān)系,spark中的DAG就是rdd內(nèi)部的轉(zhuǎn)換關(guān)系,這些轉(zhuǎn)換關(guān)系會(huì)被轉(zhuǎn)換成依賴關(guān)系,進(jìn)而被劃分成不同階段,從而描繪出任務(wù)的先后順序。
有向無環(huán)圖(Directed Acyclic Graph, DAG)是有向圖的一種,字面意思的理解就是圖中沒有環(huán)。常常被用來表示事件之間的驅(qū)動(dòng)依賴關(guān)系,管理任務(wù)之間的調(diào)度。
在圖論中,如果一個(gè)有向圖無法從任意頂點(diǎn)出發(fā)經(jīng)過若干條邊回到該點(diǎn),則這個(gè)圖是一個(gè)有向無環(huán)圖(DAG圖)。
因?yàn)橛邢驁D中一個(gè)點(diǎn)經(jīng)過兩種路線到達(dá)另一個(gè)點(diǎn)未必形成環(huán),因此有向無環(huán)圖未必能轉(zhuǎn)化成樹,但任何有向樹均為有向無環(huán)圖。
拓?fù)渑判蚴菍?duì)DAG的頂點(diǎn)進(jìn)行排序,使得對(duì)每一條有向邊(u, v),均有u(在排序記錄中)比v先出現(xiàn)。亦可理解為對(duì)某點(diǎn)v而言,只有當(dāng)v的所有源點(diǎn)均出現(xiàn)了,v才能出現(xiàn)。
下圖給出的頂點(diǎn)排序不是拓?fù)渑判?,因?yàn)轫旤c(diǎn)D的鄰接點(diǎn)E比其先出現(xiàn):
DAG可用于對(duì)數(shù)學(xué)和 計(jì)算機(jī)科學(xué)中得一些不同種類的結(jié)構(gòu)進(jìn)行建模。
由于受制于某些任務(wù)必須比另一些任務(wù)較早執(zhí)行的限制,必須排序?yàn)橐粋€(gè)隊(duì) 列的任務(wù)集合可以由一個(gè)DAG圖來呈現(xiàn),其中每個(gè)頂點(diǎn)表示一個(gè)任務(wù),每條邊表示一種限制約束,拓?fù)渑判蛩惴梢杂脕砩梢粋€(gè)有效的序列。
DAG也可以用來模擬信息沿著一個(gè)一 致性的方向通過處理器網(wǎng)絡(luò)的過程。
DAG中得可達(dá)性關(guān)系構(gòu)成了一個(gè)局 部順序,任何有限的局部順序可以由DAG使用可達(dá)性來呈現(xiàn)。
http://www.cnblogs.com/en-hen...
Spark Streaming 的 模塊 1 DAG 靜態(tài)定義 要解決的問題就是如何把計(jì)算邏輯描述為一個(gè) RDD DAG 的“模板”,在后面 Job 動(dòng)態(tài)生成的時(shí)候,針對(duì)每個(gè) batch,都將根據(jù)這個(gè)“模板”生成一個(gè) RDD DAG 的實(shí)例。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/67399.html
摘要:在移動(dòng)端,愛奇藝月度總有效時(shí)長(zhǎng)億小時(shí),穩(wěn)居中國(guó)榜第三名。愛奇藝的峰值事件數(shù)達(dá)到萬秒,在正確性容錯(cuò)性能延遲吞吐量擴(kuò)展性等方面均遇到不小的挑戰(zhàn)。從到愛奇藝主要使用的是和來進(jìn)行流式計(jì)算。作者:陳越晨 整理:劉河 本文將為大家介紹Apache Flink在愛奇藝的生產(chǎn)與實(shí)踐過程。你可以借此了解到愛奇藝引入Apache Flink的背景與挑戰(zhàn),以及平臺(tái)構(gòu)建化流程。主要內(nèi)容如下: 愛奇藝在實(shí)時(shí)計(jì)算方...
摘要:數(shù)據(jù)科學(xué)任務(wù)主要是數(shù)據(jù)分析領(lǐng)域,數(shù)據(jù)科學(xué)家要負(fù)責(zé)分析數(shù)據(jù)并建模,具備統(tǒng)計(jì)預(yù)測(cè)建模機(jī)器學(xué)習(xí)等方面的經(jīng)驗(yàn),以及一定的使用或語言進(jìn)行編程的能力。監(jiān)控運(yùn)行時(shí)性能指標(biāo)信息。 Spark Spark 背景 什么是 Spark 官網(wǎng):http://spark.apache.org Spark是一種快速、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開源,20...
閱讀 3459·2023-04-25 17:19
閱讀 719·2021-11-23 09:51
閱讀 1427·2021-11-08 13:19
閱讀 880·2021-09-29 09:34
閱讀 1782·2021-09-28 09:36
閱讀 1572·2021-09-22 14:59
閱讀 2799·2019-08-29 16:38
閱讀 2134·2019-08-26 13:40