摘要:顯然,這多帶帶執(zhí)行不起作用這將通過子操作符被作為像是自己的調(diào)度任務(wù)中那樣運(yùn)行。子也必須有個可用調(diào)度即使子作為其父的一部分被觸發(fā)子也必須有一個調(diào)度如果他們的調(diào)度是設(shè)成,這個子操作符將不會觸發(fā)任何任務(wù)。這兩個例子都是緣起子操作符被當(dāng)做了回填工作。
前言
Airbnb的數(shù)據(jù)工程師 Maxime Beauchemin 激動地表示道:Airflow 是一個我們正在用的工作流調(diào)度器,現(xiàn)在的版本已經(jīng)更新到1.6.1了,并且引入了一些列調(diào)度引擎的改革。我們喜歡它是因?yàn)樗鼘懘a太容易了,也便于調(diào)試和維護(hù)。我們也喜歡全都用他來寫代碼,而不是像xml那樣的配置文件用來描述DAG。更不用說,我們顯然不用再學(xué)習(xí)太多東西。
任務(wù)隔離在一個分布式環(huán)境中,宕機(jī)是時有發(fā)生的。Airflow通過自動重啟任務(wù)來適應(yīng)這一變化。到目前為止一切安好。當(dāng)我們有一系列你想去重置狀態(tài)的任務(wù)時,你就會發(fā)現(xiàn)這個功能簡直是救世主。為了解決這個問題,我們的策略是建立子DAG。這個子DAG任務(wù)將自動重試自己的那一部分,因此,如果你以子DAG設(shè)置任務(wù)為永不重試,那么憑借子DAG操作你就可以得到整個DAG成敗的結(jié)果。如果這個重置是DAG的第一個任務(wù)設(shè)置子DAG的策略就會非常有效,對于有一個相對復(fù)雜的依賴關(guān)系結(jié)構(gòu)設(shè)置子DAG是非常棒的做法。注意到子DAG操作任務(wù)不會正確地標(biāo)記失敗任務(wù),除非你從GitHub用了最新版本的Airflow。解決這個問題的另外一個策略是使用重試柄:
def make_spooq_exporter(table, schema, task_id, dag): return SpooqExportOperator( jdbc_url=("jdbc:mysql://%s/%s?user=user&password=pasta" % (TARGET_DB_HOST,TARGET_DB_NAME)), target_table=table, hive_table="%s.%s" % (schema, table), dag=dag, on_retry_callback=truncate_db, task_id=task_id) def truncate_db(context): hook = MySqlHook("clean_db_export") hook.run( "truncate `%s`"%context["task_instance"].task.target_table, autocommit=False, parameters=None)
這樣你的重試柄就可以將任務(wù)隔離,每次執(zhí)行某個特定的任務(wù)。
代碼定義任務(wù)這在執(zhí)行一個特定的可重復(fù)的任務(wù)時非常管用。用代碼來定義工作流是這個系統(tǒng)最強(qiáng)大之處是你可以以編碼的方式產(chǎn)生DAG。這在在沒有人工干預(yù)的情況下自動接入新的數(shù)據(jù)源的時候非常有用。
我們借助現(xiàn)有的日志目錄將檢查HDFS日志融入DAG,并且在每次融入這些數(shù)據(jù)的時候在每個目錄下產(chǎn)生一個任務(wù)。示例代碼如下:
lognames = list( hdfs.list_filenames(conf.get("incoming_log_path"), full_path=False)) for logname in lognames: # TODO 使用適當(dāng)?shù)恼齽t表達(dá)式來過濾掉不良日志名,使得Airflow 能用符合特定的字符找出相應(yīng)任務(wù)的名字 if logname not in excluded_logs and "%" not in logname and "@" not in logname: ingest = LogIngesterOperator( # 因?yàn)閘og_name以作為unicode返回值,所以需要用str()包裝task_id task_id=str("ingest_%s" % logname), db=conf.get("hive_db"), logname=logname, on_success_callback=datadog_api.check_data_lag, dag=dp_dag ) ingest.set_upstream(transfer_from_incoming) ingest.set_downstream(transform_hive)今日事,今日畢
在每天結(jié)束的時候執(zhí)行每日任務(wù),而不是在當(dāng)天工作開始的時候去執(zhí)行這些任務(wù)。你不能將子DAG放在DAG文件夾下,換句話說除非你保管一類DAG,否則你不可以將子DAG放在自己的模塊中。
子DAG與主DAG不能嵌套或者更具體地說就是,雖然你也可以將子DAG放在DAG文件夾下,但是接著子DAG將先主DAG一樣運(yùn)行自己的調(diào)度。這里是一個兩個DAG的例子(假設(shè)他們同時在DAG文件夾下,也就是所謂的差DAG)這里的子DAG將在主DAG中通過調(diào)度器被多帶帶調(diào)度。
from airflow.models import DAG from airflow.operators import PythonOperator, SubDagOperator from bad_dags.subdag import hive_dag from datetime import timedelta, datetime main_dag = DAG( dag_id="main_dag", schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21) ) # 顯然,這多帶帶執(zhí)行不起作用 transform_hive = SubDagOperator( subdag=hive_dag, task_id="hive_transform", dag=main_dag, trigger_rule=TriggerRule.ALL_DONE )
from airflow.models import DAG from airflow.operators import HiveOperator from datetime import timedelta, datetime # 這將通過子DAG操作符被作為像是自己的調(diào)度任務(wù)中那樣運(yùn)行。 hive_dag = DAG("main_dag.hive_transform", # 注意到這里的重復(fù)迭代 schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21)) hive_transform = HiveOperator(task_id="flatten_tables", hql=send_charge_hql, dag=dag)
除非你真的想這個子DAG被主DAG調(diào)度。
我們通過使用工廠函數(shù)解決這個問題。這是一個優(yōu)勢那就是 主DAG可以傳遞一些必要的參數(shù)到子DAG,因此他們在調(diào)度的時候其他參數(shù)也自動賦值了。當(dāng)你的主DAG發(fā)生變化時,我們不需要去跟蹤參數(shù)。
在下面的例子中,假設(shè)DAG是所謂的好DAG:
from airflow.models import DAG from airflow.operators import PythonOperator, SubDagOperator from good_dags.subdag import hive_dag from datetime import timedelta, datetime main_dag = DAG( dag_id="main_dag", schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21) ) # 顯然,這多帶帶執(zhí)行不起作用 transform_hive = SubDagOperator( subdag=hive_dag(main_dag.start_date, main_dag.schedule_interval), task_id="hive_transform", dag=main_dag, trigger_rule=TriggerRule.ALL_DONE )
from airflow.models import DAG from airflow.operators import HiveOperator # 對調(diào)度程序來說,沒有Dag的頂層模塊就不起作用了 def hive_dag(start_date, schedule_interval): # you might like to make the name a parameter too dag = DAG("main_dag.hive_transform", # 注意這里的設(shè)置 schedule_interval=schedule_interval, start_date=start_date) hive_transform = HiveOperator(task_id="flatten_tables", hql=send_charge_hql, dag=dag) return dag
使用工廠類使得子DAG在保障調(diào)度器從開始運(yùn)行時就可維護(hù)就更強(qiáng)。
另一種模式是將主DAG和子DAG之間的共享設(shè)為默認(rèn)參數(shù),然后傳遞到工廠函數(shù)中去,(感謝 Maxime 的建議)。
子DAG也必須有個可用調(diào)度即使子DAG作為其父DAG的一部分被觸發(fā)子DAG也必須有一個調(diào)度,如果他們的調(diào)度是設(shè)成None,這個子DAG操作符將不會觸發(fā)任何任務(wù)。
更糟糕的是,如果你對子DAG被禁用,接著你又去運(yùn)行子DAG操作,而且還沒運(yùn)行完,那么以后你的子DAG就再也運(yùn)行不起來了。
這將快速導(dǎo)致你的主DAG同時運(yùn)行的任務(wù)數(shù)量一下就達(dá)到上限(默認(rèn)一次寫入是16個)并且這將導(dǎo)致調(diào)度器形同虛設(shè)。
這兩個例子都是緣起子DAG操作符被當(dāng)做了回填工作。這里可以看到這個
什么是DagRun:遲到的禮物Airflow1.6的最大更新是引入了DagRun?,F(xiàn)在,任務(wù)調(diào)度實(shí)例是由DagRun對象來創(chuàng)建的。
相應(yīng)地,如果你想跑一個DAG而不是回填工作,你可能就需要用到DagRun。
你可以在代碼里寫一些airflow trigger_dag命令,或者也可以通過DagRun頁面來操作。
這個巨大的優(yōu)勢就是調(diào)度器的行為可以被很好的理解,就像它可以遍歷DagRun一樣,基于正在運(yùn)行的DagRun來調(diào)度任務(wù)實(shí)例。
這個服務(wù)器現(xiàn)在可以向我們顯示每一個DagRun的狀態(tài),并且將任務(wù)實(shí)例的狀態(tài)與之關(guān)聯(lián)。
DagRun是怎樣被調(diào)度的新的模型也提供了一個控制調(diào)度器的方法。下一個DagRun會基于數(shù)據(jù)庫里上一個DagRun的實(shí)例來調(diào)度。
除了服務(wù)峰值的例外之外,大多數(shù)實(shí)例是處于運(yùn)行還是結(jié)束狀態(tài)都不會影響整體任務(wù)的運(yùn)行。
這意味著如果你想返回一個在現(xiàn)有和歷史上不連續(xù)集合的部分DagRun ,你可以簡單刪掉這個DagRun任務(wù)實(shí)例,并且設(shè)置DagRun的狀態(tài)為正在運(yùn)行。
按照我們的經(jīng)驗(yàn),一個需要占用很長時間運(yùn)行的調(diào)度器至少是個最終沒有安排任務(wù)的CeleryExcecutor。很不幸,我們?nèi)匀徊恢谰唧w的原因。不過慶幸的是,Airflow 內(nèi)建了一個以num_runs形式作標(biāo)記的權(quán)宜之計。它為調(diào)度器確認(rèn)了許多迭代器來在它退出之前確保執(zhí)行這個循環(huán)。我們運(yùn)行了10個迭代,Airbnb一般運(yùn)行5個。注意到這里如果用LocalExecutor將會引發(fā)一些問題。我們現(xiàn)在使用chef來重啟executor;我們正計劃轉(zhuǎn)移到supervisor上來自動重啟。
操作符的依賴于依賴包這個airflow.operators包有一些魔法,它讓我們只能使用正確導(dǎo)入的操作符。這意味著如果你沒有安裝必要的依賴,你的操作符就會失效。
這是所有的 Fork! (現(xiàn)在)Airflow 是正在快速迭代中,而且不只是Airbnb自己在做貢獻(xiàn)。Airflow將會繼續(xù)演化,而我也將寫更多有關(guān)Airflow的技巧供大家學(xué)習(xí)使用。
如果你也對解決這些問題感興趣,那就加入我們吧!
參考資料Airflow官方文檔
docker-airflow
Airflow 的GitHub地址
Designing workflow with Airflow
Airflow Demo
pandastrike:Airflow
Airflow review
Airflow and Hive
Youtube: Airflow An open source platform to author and monitor data pipelines
Hackenews: Airflow by airbnb is a nice alternative to luigi
Luigi vs Airflow vs Pinball
Existing Workflow systems
Jonathan Dinu: Scalable Pipelines with Luigi or: I’ll have the Data Engineering, hold the Java!
AirFlow Joins Apache Incubator
Managing Containerized Data Pipeline Dependencies With Luigi
Petabyte-Scale Data Pipelines with Docker, Luigi and Elastic Spot Instances
工作流調(diào)研 oozie vs azkaban
日拱一卒
Existing Workflow systems
Awesome Pipeline
rediit: Azkaban vs Oozie vs Airflow
推薦閱讀董老師在硅谷:[硅谷熱門公司技術(shù)巡禮]1.Airbnb基礎(chǔ)數(shù)據(jù)架構(gòu)
董老師在硅谷:DAG、Workflow 系統(tǒng)設(shè)計、Airflow 與開源的那些事兒
[原]數(shù)據(jù)流編程教程:如何使用Airflow構(gòu)建數(shù)據(jù)科學(xué)工作流
原作者:Marcin Tustin 翻譯:Harry Zhu
英文原文地址:Airflow: Tips, Tricks, and Pitfalls作為分享主義者(sharism),本人所有互聯(lián)網(wǎng)發(fā)布的圖文均遵從CC版權(quán),轉(zhuǎn)載請保留作者信息并注明作者 Harry Zhu 的 FinanceR專欄:https://segmentfault.com/blog/harryprince,如果涉及源代碼請注明GitHub地址:https://github.com/harryprince。微信號: harryzhustudio
商業(yè)使用請聯(lián)系作者。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/37915.html
摘要:概述是一個我們正在用的工作流調(diào)度器,相對于傳統(tǒng)的任務(wù)管理,很好的為我們理清了復(fù)雜的任務(wù)依賴關(guān)系監(jiān)控任務(wù)執(zhí)行的情況。步驟三修改默認(rèn)數(shù)據(jù)庫找到配置文件修改配置注意到,之前使用的的方式是行不通的。微信號商業(yè)使用請聯(lián)系作者。 showImg(https://segmentfault.com/img/remote/1460000006760428?w=1918&h=1556); 概述 Airfl...
摘要:在同行評議上,我們檢查方法論的改進(jìn)現(xiàn)有工作的關(guān)聯(lián)性以及準(zhǔn)確的解釋性聲明。學(xué)習(xí)價值通過之前一系列的工作,現(xiàn)在數(shù)據(jù)科學(xué)家可以分享自己的新方法論代碼技術(shù)并且加快品牌化推廣,讓團(tuán)隊(duì)之外的人可以快速了解自己的領(lǐng)域。 頑疾 Airbnb的數(shù)據(jù)團(tuán)隊(duì)很重要的一個職責(zé)就是傳播基于數(shù)據(jù)的決策方法。我們將數(shù)據(jù)的獲取民主化,使得每一個Airbnb的成員都可以量化他們基于數(shù)據(jù)的決策影響力并且借此洞察用戶偏好,提...
摘要:面試如何防騙一份優(yōu)秀的前端開發(fā)工程師簡歷是怎么樣的作為,有哪些一般人我都告訴他,但是他都不聽的忠告如何面試前端工程師 更多資源請Star:https://github.com/maidishike... 文章轉(zhuǎn)自:https://github.com/jsfront/mo... 3月份前端資源分享 1. Javascript 使用judge.js做信息判斷 javascript...
摘要:蠎周刊年度最贊親俺們又來回顧又一個偉大的年份兒包去年最受歡迎的文章和項(xiàng)目如果你錯過了幾期就這一期不會丟失最好的嗯哼還為你和你的準(zhǔn)備了一批紀(jì)念裇從這兒獲取任何時候如果想分享好物給大家在這兒提交喜歡我們收集的任何意見建議通過來吧原文 Title: 蠎周刊 2015 年度最贊Date: 2016-01-09 Tags: Weekly,Pycoder,Zh Slug: issue-198-to...
摘要:概述我非常認(rèn)同前百度數(shù)據(jù)工程師現(xiàn)神策分析創(chuàng)始人桑老師最近談到的數(shù)據(jù)分析三重境界統(tǒng)計計數(shù)多維分析機(jī)器學(xué)習(xí)數(shù)據(jù)分析的統(tǒng)計計數(shù)和多維分析,我們通常稱之為數(shù)據(jù)探索式分析,這個步驟旨在了解數(shù)據(jù)的特性,有助于我們進(jìn)一步挖掘數(shù)據(jù)的價值。 showImg(https://camo.githubusercontent.com/f98421e503a81176b003ddd310d97e1e1214625...
閱讀 1967·2021-11-15 11:46
閱讀 1180·2021-10-26 09:49
閱讀 1913·2021-10-14 09:42
閱讀 3449·2021-09-26 09:55
閱讀 893·2019-08-30 13:58
閱讀 1114·2019-08-29 16:40
閱讀 3545·2019-08-26 10:27
閱讀 669·2019-08-23 18:18