摘要:年月日參考鏈接使用不得不明白的知識隊(duì)列文檔中文文檔本文環(huán)境隊(duì)列為什么使用隊(duì)列使用隊(duì)列的目的一般是異步執(zhí)行出錯重試解釋一下異步執(zhí)行部分代碼執(zhí)行很耗時(shí)為了提高響應(yīng)速度及避免占用過多連接資源可以將這部分代碼放到隊(duì)列中異步執(zhí)行網(wǎng)站新用戶注冊后需要
Last-Modified: 2019年5月10日15:04:22
參考鏈接使用 Laravel Queue 不得不明白的知識
Laravel 隊(duì)列文檔
Redis 中文文檔
本文環(huán)境Laravel 5.5
隊(duì)列 Redis
為什么使用隊(duì)列使用隊(duì)列的目的一般是:
異步執(zhí)行
出錯重試
解釋一下:
異步執(zhí)行: 部分代碼執(zhí)行很耗時(shí), 為了提高響應(yīng)速度及避免占用過多連接資源, 可以將這部分代碼放到隊(duì)列中異步執(zhí)行.
Eg. 網(wǎng)站新用戶注冊后, 需要發(fā)送歡迎的郵件, 涉及到網(wǎng)絡(luò)IO無法控制耗時(shí)的這一類就很適合放到隊(duì)列中來執(zhí)行.
出錯重試: 為了保證一些任務(wù)的正常執(zhí)行, 可以將任務(wù)放到隊(duì)列中執(zhí)行, 若執(zhí)行出錯則可以延遲一段時(shí)間后重試, 直到任務(wù)處理成功或出錯超過N次后取消執(zhí)行.
Eg. 用戶需要綁定手機(jī)號, 此時(shí)發(fā)送短信的接口是依賴第三方, 一個是不確定耗時(shí), 一個是不確定調(diào)用的成功, 為了保證調(diào)用成功, 必然需要在出錯后重試Laravel 中的隊(duì)列
以下分析默認(rèn)使用的隊(duì)列及其配置如下
默認(rèn)隊(duì)列引擎: redis
通過在 redis-cli 中使用 monitor 命令查看具體執(zhí)行的命令語句
默認(rèn)隊(duì)列名: default
分發(fā)任務(wù)此處以分發(fā) 異步通知(class XxxNotification implement ShouldQueue)為例.
在Laravel中發(fā)起異步通知時(shí), Laravel 會往redis中的任務(wù)隊(duì)列添加一條新任務(wù)
redis 執(zhí)行語句
redis> RPUSH queues:default { "displayName": "AppListenersRebateEventListener", "job": "IlluminateQueueCallQueuedHandler@call", "maxTries": null, "timeout": null, "timeoutAt": null, "data": { "commandName": "IlluminateEventsCallQueuedListener", "command": "O:36:"IlluminateEventsCallQueuedListener":7:{s:5:"class";s:33:"AppListenersRebateEventListener";s:6:"method";s:15:"onRebateCreated";s:4:"data";a:1:{i:0;O:29:"AppEventsRebateCreatedEvent":4:{s:11:"u0000*u0000tbkOrder";O:45:"IlluminateContractsDatabaseModelIdentifier":3:{s:5:"class";s:19:"AppModelsTbkOrder";s:2:"id";i:416;s:10:"connection";s:5:"mysql";}s:15:"u0000*u0000notifyAdmins";b:1;s:13:"u0000*u0000manualBind";b:0;s:6:"socket";N;}}s:5:"tries";N;s:9:"timeoutAt";N;s:7:"timeout";N;s:6:"u0000*u0000job";N;}" }, "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB", "attempts": 0 }
上面的redis語句是將任務(wù)信息(json格式) rpush 到 redis 隊(duì)列 queues:default 的尾部.
任務(wù)隊(duì)列 WorkerLaravel 處理任務(wù)隊(duì)列的進(jìn)程開啟方式: php artisan queue:work, 為了更好的觀察, 這里使用 --once 選項(xiàng)來指定隊(duì)列中的單一任務(wù)進(jìn)行處理, 具體的更多參數(shù)請自行參考文檔
php artisan queue:work --once --delay=1 --tries=3
上述執(zhí)行語句參數(shù)含義:
--once 僅執(zhí)行一次任務(wù), 默認(rèn)是常駐進(jìn)程一直執(zhí)行
--tries=3 任務(wù)出錯最多重試3次, 默認(rèn)是無限制重試
--delay=1 任務(wù)出錯后, 每次延遲1秒后再次執(zhí)行, 默認(rèn)是延遲0秒
當(dāng) Worker 啟動時(shí), 它依次執(zhí)行如下步驟:
此處仍以默認(rèn)隊(duì)列 default 為例講解, 且只講解redis的相關(guān)操作
從 queues:default:delayed 有序集合中獲取可以處理的 "延遲任務(wù)", 并 rpush 到 queue:default隊(duì)列的尾部
具體的執(zhí)行語句:
redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當(dāng)前時(shí)間戳
Lua 腳本內(nèi)容如下:
-- Get all of the jobs with an expired "score"... local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto thedestination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call("zremrangebyrank", KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return val
從 queue:default:reserved有序集合中獲取已過期的 "reserved 任務(wù)", 并 rpush 到 queue:default隊(duì)列的尾部
具體的執(zhí)行語句:
redis> eval "Lua腳本" 2 queues:default:reserved queues:default 當(dāng)前時(shí)間戳
使用的Lua腳本同步驟 1
從 queue:default 隊(duì)列中獲取(lpop)一個任務(wù), 增加其 attempts 次數(shù), 并將該任務(wù)保存到 queu:default:reserved 有序集合中, 該任務(wù)的 score 值為 當(dāng)前時(shí)間 + 90(任務(wù)執(zhí)行超時(shí)時(shí)間)
具體的執(zhí)行語句:
redis> eval “Lua腳本” 2 queues:default queues:default:reserved 任務(wù)超時(shí)時(shí)間戳
Lua腳本
-- Pop the first job off of the queue... local job = redis.call("lpop", KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved["attempts"] = reserved["attempts"] + 1 reserved = cjson.encode(reserved) redis.call("zadd", KEYS[2], ARGV[1], reserved) end return {job, reserved}
這里的 90 是根據(jù)配置而定: config("queue.connections.redis.retry_after")若預(yù)計(jì)任務(wù)耗時(shí)過久, 則應(yīng)增加該數(shù)值, 防止任務(wù)還在執(zhí)行時(shí)就被重置
在成功執(zhí)行上面獲取的任務(wù)后, 就將該任務(wù)從 queues:default:reserved 隊(duì)列中移除掉
具體執(zhí)行語句: ZREM queues:default:reserved "具體任務(wù)"
如果執(zhí)行任務(wù)失敗, 此時(shí)分為2種情況:
任務(wù)失敗次數(shù)未達(dá)到指定的重試次數(shù)閥值
將該任務(wù)從 queues:default:reserved 中移除, 并將該任務(wù)添加到 queue:default:delayed 有序集合中, score 為該任務(wù)下一次執(zhí)行的時(shí)間戳
執(zhí)行語句:
redis> EVAL "Lua腳本" 2 queues:default:delayed queues:default:reserved "失敗的任務(wù)" 任務(wù)延遲執(zhí)行的時(shí)間戳
Lua腳本
-- Remove the job from the current queue... redis.call("zrem", KEYS[2], ARGV[1]) -- Add the job onto the "delayed" queue... redis.call("zadd", KEYS[1], ARGV[2], ARGV[1]) return true
如果任務(wù)失敗次數(shù)超過指定的重試閥值
將該任務(wù)從 queue:default:reserved 中移除
執(zhí)行語句:
redis> ZREM queue:default:reserved
注意, 上述使用 Lua 腳本的目的在于操作的原子性, Redis 是單進(jìn)程單線程模式, 以Lua腳本形式執(zhí)行命令時(shí)可以確保執(zhí)行腳本的原子性, 而不會有并發(fā)問題.
關(guān)于Redis的原子操作上面 Laravel 使用redis作為隊(duì)列存儲引擎時(shí), 在操作redis時(shí)使用到了 exec 執(zhí)行Lua腳本, 以確保原子性.
這里給不熟悉redis的同學(xué)簡單講一下.
以上面 Worker 啟動時(shí)的步驟1為例:
從 queues:default:delayed 有序集合中獲取可以處理的 "延遲任務(wù)", 并 rpush 到 queue:default隊(duì)列的尾部
具體的執(zhí)行語句:
redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當(dāng)前時(shí)間戳Lua 腳本內(nèi)容如下:
-- Get all of the jobs with an expired "score"... local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto thedestination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call("zremrangebyrank", KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return val
上述步驟首先從 queues:default:delayed 有序集合中獲取可以處理的 "延遲任務(wù)" 并 rpush 到 queue:default隊(duì)列的尾部.
那么如果不使用Lua腳本的話, 一般做法會是如下:
$jobs = $redis->zRangeByScore("queues:default:delayed", "-inf", time()) if (!empty($jobs)) { $redis->zRem("queues:default:delayed", ...$jobs); $redis->rPush("queues:default", ...$jobs); }
如果是單個Worker的話, 上述腳本不會有問題, 但是如果有多個Worker呢? 在php層面上執(zhí)行上述操作是會有并發(fā)問題的.
Worker_1 和 Worker_2 從 queues:default:delayed 隊(duì)列中獲取多個任務(wù)后, 執(zhí)行 rPush 語句會導(dǎo)致任務(wù)被執(zhí)行2次, 如果有多個 Worker 甚至?xí)?zhí)行更多次.
只要是有可能引起并發(fā)問題的情況, 那么就一定會發(fā)生.以 分布式鎖 為例
鎖的兩大基本操作:
Lock
Unlock
Lock 操作
// 生成唯一的鎖id $identifier = uniqid(php_uname("n") . "_", true); // 僅在該key不存在時(shí)設(shè)置, 過期時(shí)間5秒 $result = $redis->set("lock_key", $identifier, ["NX", "EX" => 5]);
Unlock 操作
$script = <<evaluate($script, ["lock_key", $identifier], 1);
至于 Unlock 操作為什么要這么麻煩, 可以看一下以下兩種有問題的方案, 再想一想.
有問題的方案一
$redis->del("lock_key");
有問題的方案二
if ($redis->get("lock_key") == $identifier) { $redis->del("lock_key"); }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/31416.html
摘要:在使用中的隊(duì)列時(shí),產(chǎn)生沖突干擾。文件中的配置部分至此,兩個項(xiàng)目的隊(duì)列沖突原因就找到了。隊(duì)列監(jiān)聽最后遇到問題,莫要病急亂投醫(yī)。從代碼入手,分析理解實(shí)現(xiàn)原理,找對點(diǎn),解決方法也許很簡單,。 問題 公司項(xiàng)目使用Laravel的開發(fā)的兩個項(xiàng)目在同一個測試服務(wù)器部署,公用同一個redis。在使用laravel中的隊(duì)列時(shí),產(chǎn)生沖突干擾。 查找問題原因 在laravel 隊(duì)列的操作類 Illumin...
摘要:一前言之前在項(xiàng)目中需要使用的事件廣播,而且項(xiàng)目打算使用作為驅(qū)動,但發(fā)現(xiàn)網(wǎng)上的資料大部分都是驅(qū)動的,只能自己摸索著搭建了一下服務(wù)。 一、前言 之前在項(xiàng)目中需要使用laravel的事件廣播,而且項(xiàng)目打算使用redis作為驅(qū)動,但發(fā)現(xiàn)網(wǎng)上的資料大部分都是Pusher驅(qū)動的,只能自己摸索著搭建了一下服務(wù)。現(xiàn)在將這個過程記錄一下,希望能幫到其他人。 二、項(xiàng)目的環(huán)境 事件廣播需要用到redis,n...
摘要:如果任務(wù)沒有在規(guī)定時(shí)間內(nèi)完成,那么該有序集合的任務(wù)將會被重新放入隊(duì)列中。這兩個進(jìn)程操縱了三個隊(duì)列,其中一個,負(fù)責(zé)即時(shí)任務(wù),兩個,負(fù)責(zé)延時(shí)任務(wù)與待處理任務(wù)。如果任務(wù)執(zhí)行成功,就會刪除中的任務(wù),否則會被重新放入隊(duì)列中。 在實(shí)際的項(xiàng)目開發(fā)中,我們經(jīng)常會遇到需要輕量級隊(duì)列的情形,例如發(fā)短信、發(fā)郵件等,這些任務(wù)不足以使用 kafka、RabbitMQ 等重量級的消息隊(duì)列,但是又的確需要異步、重試...
閱讀 3654·2021-11-24 10:19
閱讀 3802·2021-09-30 09:47
閱讀 1369·2019-08-30 15:56
閱讀 858·2019-08-29 15:11
閱讀 963·2019-08-29 13:43
閱讀 3644·2019-08-28 18:25
閱讀 2220·2019-08-26 13:27
閱讀 1491·2019-08-26 11:44