摘要:延遲任務(wù)使用列表結(jié)構(gòu)可以實(shí)現(xiàn)只能執(zhí)行一種任務(wù)的隊(duì)列,也可以實(shí)現(xiàn)通過調(diào)用不同回調(diào)函數(shù)來執(zhí)行不同任務(wù)的隊(duì)列,甚至還可以實(shí)現(xiàn)簡單的優(yōu)先級隊(duì)列。
在處理Web客戶端發(fā)送的命令請求時,某些操作的執(zhí)行時間可能會比我們預(yù)期的更長一些。通過將待執(zhí)行任務(wù)的相關(guān)信息放入隊(duì)列里面,并在之后對隊(duì)列進(jìn)行處理,用戶可以推遲那些需要一段時間才能完成的操作,這種工作交給任務(wù)處理器來執(zhí)行的做法被稱為任務(wù)隊(duì)列(task queue)?,F(xiàn)在有很多專門的任務(wù)隊(duì)列軟件(如ActiveMQ、RabbitMQ、Gearman、Amazon SQS),接下來實(shí)現(xiàn)兩種不同類型的任務(wù)隊(duì)列,第一種隊(duì)列會根據(jù)任務(wù)被插入隊(duì)列的順序來盡快地執(zhí)行任務(wù),第二種隊(duì)列具有安排任務(wù)在未來某個特定時間執(zhí)行的能力。
先進(jìn)先出隊(duì)列除了任務(wù)隊(duì)列之外,還有先進(jìn)先出(FIFO)隊(duì)列、后進(jìn)后出(LIFO)隊(duì)列和優(yōu)先級(priority)隊(duì)列。
使用任務(wù)隊(duì)列來記錄郵件的收信人以及發(fā)送郵件的原因,并構(gòu)建一個可以在郵件發(fā)送服務(wù)器運(yùn)行變得緩慢的時候,以并行方式一次發(fā)送多封郵件的工作進(jìn)程(worker process)。
要編寫的隊(duì)列將以“先到先服務(wù)”(first-come,first-served)的方式發(fā)送郵件,并且無論發(fā)送是否成功,程序都會把發(fā)送結(jié)果記錄到日志里面。Redis的列表結(jié)構(gòu)允許用戶通過RPUSH和LPUSH以及RPOP和LPOP,從列表的兩端推入和彈出元素。郵件隊(duì)列使用RPUSH命令來將待發(fā)送的郵件推入列表的右端,并且因?yàn)楣ぷ鬟M(jìn)程除了發(fā)送郵件之外不需要執(zhí)行其他工作,所以它將使用阻塞版本的彈出命令BLPOP從隊(duì)列中彈出待發(fā)送的郵件,而命令的最大阻塞時限為30秒。
郵件隊(duì)列由一個Redis列表構(gòu)成,包含多個JSON編碼對象。為了將待發(fā)送的郵件推入隊(duì)列里面,程序會獲取發(fā)送郵件所需的全部信息,并將這些信息序列化為JSON對象,最后使用RPUSH命令將JSON對象推入郵件隊(duì)列里面。
def send_sold_email_via_queue(conn, seller, item, price, buyer): data = { "seller_id": seller, "item_id": item, "price": price, "buyer_id": buyer, "time": time.time() } conn.rpush("queue:email", json.dumps(data))
從隊(duì)列里獲取待發(fā)送郵件,程序首先使用BLPOP命令從郵件隊(duì)列里面彈出一個JSON對象,接著通過解碼JSON對象來取得待發(fā)送郵件的相關(guān)信息,最后根據(jù)這些信息來發(fā)送郵件。
def process_sold_email_queue(conn): while not QUIT: packed = conn.blpop(["queue:email"], 30) //獲取一封待發(fā)送郵件 if not packed: //隊(duì)列里面暫時還沒有待發(fā)送郵件,重試 continue to_send = json.loads(packed[1]) //從JSON對象中解碼出郵件信息 try: fetch_data_and_send_sold_email(to_send) except EmailSendError as err: log_error("Failed to send sold email", err, to_send) else: log_success("Send sold email", to_send)多個可執(zhí)行任務(wù)
因?yàn)锽LPOP命令每次只會從隊(duì)列里面彈出一封待發(fā)送郵件,所以待發(fā)送郵件不會出現(xiàn)重復(fù),也不會被重復(fù)發(fā)送。并且因?yàn)殛?duì)列只會存放待發(fā)送郵件,所以工作進(jìn)程要處理的任務(wù)是非常單一的。下面代碼的工作進(jìn)程會監(jiān)視用戶提供的多個隊(duì)列,并從多個已知的已注冊回調(diào)函數(shù)里面,選出一個函數(shù)來處理JSON編碼的函數(shù)調(diào)用。
def worker_watch_queue(conn, queue, callback): while not QUIT: packed = conn.blpop([queue], 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: //沒有找到任務(wù)指定的回調(diào)函數(shù),用日志記錄錯誤并重試 log_error("Unknown callback %s"%name) continue callbacks[name](*args) //執(zhí)行任務(wù)任務(wù)優(yōu)先級
在使用隊(duì)列的時候,程序可能會需要讓特定的操作優(yōu)先于其他操作執(zhí)行。
假設(shè)現(xiàn)在我們需要為任務(wù)設(shè)置高、中、低3種優(yōu)先級別,其中:高優(yōu)先級任務(wù)在出現(xiàn)之后會第一時間被執(zhí)行,而中等優(yōu)先級任務(wù)則會在沒有任何高優(yōu)先級任務(wù)存在的情況下被執(zhí)行,而低優(yōu)先級任務(wù)則會在既沒有任何高優(yōu)先級任務(wù),又沒有任何中等優(yōu)先級任務(wù)的情況下被執(zhí)行。
def worker_watch_queues(conn, queues, callbacks): while not QUIT: packed = conn.blpop(queues, 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: log_error("Unknown callback %s"%name) continue callbacks[name](*args)
同時使用多個隊(duì)列可以降低實(shí)現(xiàn)優(yōu)先級特性的難度。除此之外,多隊(duì)列有時候也會被用于分隔不同的任務(wù)(如同一個隊(duì)列存放公告郵件,而另一個隊(duì)列則存放提醒郵件),在這種情況下,處理不同隊(duì)列時可能出現(xiàn)不公平現(xiàn)象。為此,我們可以偶爾重新排列各個隊(duì)列的順序,使得針對隊(duì)列的處理操作變得更公平一些,當(dāng)某個隊(duì)列的增長速度比其他隊(duì)列的增長速度快的時候,這種重拍操作尤為重要。
延遲任務(wù)使用列表結(jié)構(gòu)可以實(shí)現(xiàn)只能執(zhí)行一種任務(wù)的隊(duì)列,也可以實(shí)現(xiàn)通過調(diào)用不同回調(diào)函數(shù)來執(zhí)行不同任務(wù)的隊(duì)列,甚至還可以實(shí)現(xiàn)簡單的優(yōu)先級隊(duì)列。
以下3種方法可以為隊(duì)列中的任務(wù)添加延遲性質(zhì):
在任務(wù)信息中包含任務(wù)的執(zhí)行時間,如果工作進(jìn)程發(fā)現(xiàn)任務(wù)的執(zhí)行時間尚未來臨,那么它將在短暫等待之后,把任務(wù)重新推入隊(duì)列里面。
工作進(jìn)程使用一個本地的等待列表來記錄所有需要在未來執(zhí)行的任務(wù),并在每次進(jìn)行while循環(huán)的時候,檢查等待列表并執(zhí)行那些已經(jīng)到期的任務(wù)。
把所有需要在未來執(zhí)行的任務(wù)都添加到有序集合里面,并將任務(wù)的執(zhí)行時間設(shè)置為分值,另外再使用一個進(jìn)程來查找有序集合里面是否存在可以立即被執(zhí)行的任務(wù),如果有的話,就從有序集合里面移除那個任務(wù),并將它添加到適當(dāng)?shù)萌蝿?wù)隊(duì)列里面。
因?yàn)闊o論是進(jìn)行短暫的等待,還是將任務(wù)重新推入隊(duì)列里面,都會浪費(fèi)工作進(jìn)程的時間,所以我們不會采用第一種方法。此外,因?yàn)楣ぷ鬟M(jìn)程可能會因?yàn)楸罎⒍鴣G失本地記錄的所有待執(zhí)行任務(wù),所以我們也不會采用第二種方法。最后,因?yàn)槭褂糜行蚣系牡谌N方法最簡單直接,所以我們將采取這一方法,并使用鎖來保證任務(wù)從有序集合移動到任務(wù)隊(duì)列時的安全性。
有序集合隊(duì)列(ZSET queue)存儲的每個被延遲的任務(wù)都是一個包含4個值的JSON列表,這4個分值分別是:唯一標(biāo)識符、處理任務(wù)隊(duì)列的名字、處理任務(wù)的回調(diào)函數(shù)的名字、傳給回調(diào)函數(shù)的參數(shù)。在有序集合里面,任務(wù)的分值會被設(shè)置為任務(wù)的執(zhí)行時間,而立即可執(zhí)行的任務(wù)將被直接插入任務(wù)隊(duì)列里面。下面代碼展示了創(chuàng)建延遲任務(wù)(任務(wù)是否延遲是可選的,只要把任務(wù)的延遲時間設(shè)置為0就可以創(chuàng)建一個立即執(zhí)行的任務(wù))。
def execute_later(conn, queue, name, args, delay=0): identifier = str(uuid.uuid4()) item = json.dumps([identifier, queue, name, args]) if delay > 0: conn.zadd("delayed:", item, time.time() + delay) else: conn.rpush("queue:" + queue, item) return identifier
因?yàn)镽edis沒有提供直接的方法可以阻塞有序集合直到元素的分值低于當(dāng)前UNIX時間戳為止,所以我們需要自己來查找有序集合里面分值低于當(dāng)前UNIX時間戳的任務(wù)。因?yàn)樗斜谎舆t的任務(wù)都存儲在同一個有序集合隊(duì)列里面,所以程序只需要獲取有序集合里面排名第一的元素以及該元素的分值就可以了:如果隊(duì)列里面沒有任何任務(wù),或者任務(wù)的執(zhí)行時間尚未來臨,那么程序?qū)⒃诙虝旱却笾卦?;如果任?wù)的執(zhí)行時間已到,那么程序?qū)⒏鶕?jù)任務(wù)包含的標(biāo)識符來獲取一個細(xì)粒度鎖,接著從有序集合里面移除要被執(zhí)行的任務(wù),并將它添加到適當(dāng)?shù)娜蝿?wù)隊(duì)列里面。通過將可執(zhí)行的任務(wù)添加到任務(wù)隊(duì)列里面而不是直接執(zhí)行它們,我們可以把獲取可執(zhí)行任務(wù)的進(jìn)程數(shù)量限制在一兩個之內(nèi),而不必根據(jù)工作進(jìn)程的數(shù)量來決定運(yùn)行多少個獲取進(jìn)程,這減少了獲取可執(zhí)行任務(wù)所需的花銷。
def poll_queue(conn): while not QUIT: item = conn.zrange("delayed:", 0, 0, withscores=True) if not item or item[0][1] > time.time(): time.sleep(.01) continue item = item[0][0] identifier, queue, function, args = json.loads(item) locked = acquire_lock(conn, identifier) if not locked: continue if conn.zrem("delayed:", item): conn.rpush("queue:" + queue, item) release_lock(conn, identifier, locked)
因?yàn)橛行蚣喜⒉痪邆湎窳斜砟菢拥淖枞麖棾鰴C(jī)制,所以程序需要不斷地進(jìn)行循環(huán),并嘗試從隊(duì)列里面獲取要被執(zhí)行的任務(wù),雖然這一操作會增大網(wǎng)絡(luò)和處理器的負(fù)載,但因?yàn)槲覀冎粫\(yùn)行一兩個這樣的程序,所以不會消耗太多資源。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/44607.html
摘要:如果任務(wù)沒有在規(guī)定時間內(nèi)完成,那么該有序集合的任務(wù)將會被重新放入隊(duì)列中。這兩個進(jìn)程操縱了三個隊(duì)列,其中一個,負(fù)責(zé)即時任務(wù),兩個,負(fù)責(zé)延時任務(wù)與待處理任務(wù)。如果任務(wù)執(zhí)行成功,就會刪除中的任務(wù),否則會被重新放入隊(duì)列中。 在實(shí)際的項(xiàng)目開發(fā)中,我們經(jīng)常會遇到需要輕量級隊(duì)列的情形,例如發(fā)短信、發(fā)郵件等,這些任務(wù)不足以使用 kafka、RabbitMQ 等重量級的消息隊(duì)列,但是又的確需要異步、重試...
摘要:配置項(xiàng)用于配置失敗隊(duì)列任務(wù)存放的數(shù)據(jù)庫及數(shù)據(jù)表。要使用隊(duì)列驅(qū)動,需要在配置文件中配置數(shù)據(jù)庫連接。如果應(yīng)用使用了,那么可以使用時間或并發(fā)來控制隊(duì)列任務(wù)。你可以使用命令運(yùn)行這個隊(duì)列進(jìn)程。如果隊(duì)列進(jìn)程意外關(guān)閉,它會自動重啟啟動隊(duì)列進(jìn)程。 一、概述 在Web開發(fā)中,我們經(jīng)常會遇到需要批量處理任務(wù)的場景,比如群發(fā)郵件、秒殺資格獲取等,我們將這些耗時或者高并發(fā)的操作放到隊(duì)列中異步執(zhí)行可以有效緩解系...
摘要:場景說明用于處理比較耗時的請求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁觸發(fā)執(zhí)行發(fā)送,程序會出現(xiàn)超時高并發(fā)場景,當(dāng)某個時刻請求瞬間增加時,可以把請求寫入到隊(duì)列,后臺在去處理這些請求搶購場景,先入先出的模式命令或往列表右側(cè)推入數(shù)據(jù)客戶端阻塞直到隊(duì)列有 場景說明: 用于處理比較耗時的請求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁觸發(fā)執(zhí)行發(fā)送,程序會出現(xiàn)超時 高并發(fā)場景,當(dāng)某個時刻請求瞬間增加時,可以把請...
閱讀 1845·2021-09-22 15:25
閱讀 1360·2019-08-29 12:34
閱讀 1989·2019-08-26 13:57
閱讀 3264·2019-08-26 10:48
閱讀 1496·2019-08-26 10:45
閱讀 860·2019-08-23 18:23
閱讀 793·2019-08-23 18:01
閱讀 2006·2019-08-23 16:07