亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

Laravel 基于redis隊(duì)列的解析

banana_pi / 3243人閱讀

摘要:年月日參考鏈接使用不得不明白的知識隊(duì)列文檔中文文檔本文環(huán)境隊(duì)列為什么使用隊(duì)列使用隊(duì)列的目的一般是異步執(zhí)行出錯重試解釋一下異步執(zhí)行部分代碼執(zhí)行很耗時(shí)為了提高響應(yīng)速度及避免占用過多連接資源可以將這部分代碼放到隊(duì)列中異步執(zhí)行網(wǎng)站新用戶注冊后需要

Last-Modified: 2019年5月10日15:04:22

參考鏈接

使用 Laravel Queue 不得不明白的知識

Laravel 隊(duì)列文檔

Redis 中文文檔

本文環(huán)境

Laravel 5.5

隊(duì)列 Redis

為什么使用隊(duì)列

使用隊(duì)列的目的一般是:

異步執(zhí)行

出錯重試

解釋一下:

異步執(zhí)行: 部分代碼執(zhí)行很耗時(shí), 為了提高響應(yīng)速度及避免占用過多連接資源, 可以將這部分代碼放到隊(duì)列中異步執(zhí)行.

Eg. 網(wǎng)站新用戶注冊后, 需要發(fā)送歡迎的郵件, 涉及到網(wǎng)絡(luò)IO無法控制耗時(shí)的這一類就很適合放到隊(duì)列中來執(zhí)行.

出錯重試: 為了保證一些任務(wù)的正常執(zhí)行, 可以將任務(wù)放到隊(duì)列中執(zhí)行, 若執(zhí)行出錯則可以延遲一段時(shí)間后重試, 直到任務(wù)處理成功或出錯超過N次后取消執(zhí)行.

Eg. 用戶需要綁定手機(jī)號, 此時(shí)發(fā)送短信的接口是依賴第三方, 一個是不確定耗時(shí), 一個是不確定調(diào)用的成功, 為了保證調(diào)用成功, 必然需要在出錯后重試
Laravel 中的隊(duì)列

以下分析默認(rèn)使用的隊(duì)列及其配置如下

默認(rèn)隊(duì)列引擎: redis

通過在 redis-cli 中使用 monitor 命令查看具體執(zhí)行的命令語句

默認(rèn)隊(duì)列名: default

分發(fā)任務(wù)

此處以分發(fā) 異步通知(class XxxNotification implement ShouldQueue)為例.

在Laravel中發(fā)起異步通知時(shí), Laravel 會往redis中的任務(wù)隊(duì)列添加一條新任務(wù)

redis 執(zhí)行語句

redis> RPUSH queues:default

{
    "displayName": "AppListenersRebateEventListener",
    "job": "IlluminateQueueCallQueuedHandler@call",
    "maxTries": null,
    "timeout": null,
    "timeoutAt": null,
    "data": {
        "commandName": "IlluminateEventsCallQueuedListener",
        "command": "O:36:"IlluminateEventsCallQueuedListener":7:{s:5:"class";s:33:"AppListenersRebateEventListener";s:6:"method";s:15:"onRebateCreated";s:4:"data";a:1:{i:0;O:29:"AppEventsRebateCreatedEvent":4:{s:11:"u0000*u0000tbkOrder";O:45:"IlluminateContractsDatabaseModelIdentifier":3:{s:5:"class";s:19:"AppModelsTbkOrder";s:2:"id";i:416;s:10:"connection";s:5:"mysql";}s:15:"u0000*u0000notifyAdmins";b:1;s:13:"u0000*u0000manualBind";b:0;s:6:"socket";N;}}s:5:"tries";N;s:9:"timeoutAt";N;s:7:"timeout";N;s:6:"u0000*u0000job";N;}"
    },
    "id": "iTqpbeDqqFb3VoED2WP3pgmDbLAUQcMB",
    "attempts": 0
}

上面的redis語句是將任務(wù)信息(json格式) rpush 到 redis 隊(duì)列 queues:default 的尾部.

任務(wù)隊(duì)列 Worker

Laravel 處理任務(wù)隊(duì)列的進(jìn)程開啟方式: php artisan queue:work, 為了更好的觀察, 這里使用 --once 選項(xiàng)來指定隊(duì)列中的單一任務(wù)進(jìn)行處理, 具體的更多參數(shù)請自行參考文檔

php artisan queue:work --once --delay=1 --tries=3

上述執(zhí)行語句參數(shù)含義:

--once 僅執(zhí)行一次任務(wù), 默認(rèn)是常駐進(jìn)程一直執(zhí)行

--tries=3 任務(wù)出錯最多重試3次, 默認(rèn)是無限制重試

--delay=1 任務(wù)出錯后, 每次延遲1秒后再次執(zhí)行, 默認(rèn)是延遲0秒

當(dāng) Worker 啟動時(shí), 它依次執(zhí)行如下步驟:

此處仍以默認(rèn)隊(duì)列 default 為例講解, 且只講解redis的相關(guān)操作

queues:default:delayed 有序集合中獲取可以處理的 "延遲任務(wù)", 并 rpushqueue:default隊(duì)列的尾部

具體的執(zhí)行語句:

redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當(dāng)前時(shí)間戳

Lua 腳本內(nèi)容如下:

-- Get all of the jobs with an expired "score"...
local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto thedestination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call("zremrangebyrank", KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end

return val 

queue:default:reserved有序集合中獲取已過期的 "reserved 任務(wù)", 并 rpushqueue:default隊(duì)列的尾部

具體的執(zhí)行語句:

redis> eval "Lua腳本" 2 queues:default:reserved queues:default 當(dāng)前時(shí)間戳

使用的Lua腳本同步驟 1

queue:default 隊(duì)列中獲取(lpop)一個任務(wù), 增加其 attempts 次數(shù), 并將該任務(wù)保存到 queu:default:reserved 有序集合中, 該任務(wù)的 score 值為 當(dāng)前時(shí)間 + 90(任務(wù)執(zhí)行超時(shí)時(shí)間)

具體的執(zhí)行語句:

redis> eval “Lua腳本” 2 queues:default queues:default:reserved 任務(wù)超時(shí)時(shí)間戳

Lua腳本

-- Pop the first job off of the queue...
local job = redis.call("lpop", KEYS[1])
local reserved = false

if(job ~= false) then
    -- Increment the attempt count and place job on the reserved queue...
    reserved = cjson.decode(job)
    reserved["attempts"] = reserved["attempts"] + 1
    reserved = cjson.encode(reserved)
    redis.call("zadd", KEYS[2], ARGV[1], reserved)
end

return {job, reserved}
這里的 90 是根據(jù)配置而定:  config("queue.connections.redis.retry_after")

若預(yù)計(jì)任務(wù)耗時(shí)過久, 則應(yīng)增加該數(shù)值, 防止任務(wù)還在執(zhí)行時(shí)就被重置

在成功執(zhí)行上面獲取的任務(wù)后, 就將該任務(wù)從 queues:default:reserved 隊(duì)列中移除掉

具體執(zhí)行語句: ZREM queues:default:reserved "具體任務(wù)"

如果執(zhí)行任務(wù)失敗, 此時(shí)分為2種情況:

任務(wù)失敗次數(shù)未達(dá)到指定的重試次數(shù)閥值

將該任務(wù)從 queues:default:reserved 中移除, 并將該任務(wù)添加到 queue:default:delayed 有序集合中, score 為該任務(wù)下一次執(zhí)行的時(shí)間戳

執(zhí)行語句:

redis> EVAL "Lua腳本" 2 queues:default:delayed queues:default:reserved "失敗的任務(wù)" 任務(wù)延遲執(zhí)行的時(shí)間戳

Lua腳本

-- Remove the job from the current queue...
redis.call("zrem", KEYS[2], ARGV[1])

-- Add the job onto the "delayed" queue...
redis.call("zadd", KEYS[1], ARGV[2], ARGV[1])

return true

如果任務(wù)失敗次數(shù)超過指定的重試閥值

將該任務(wù)從 queue:default:reserved 中移除

執(zhí)行語句:

redis> ZREM queue:default:reserved

注意, 上述使用 Lua 腳本的目的在于操作的原子性, Redis 是單進(jìn)程單線程模式, 以Lua腳本形式執(zhí)行命令時(shí)可以確保執(zhí)行腳本的原子性, 而不會有并發(fā)問題.

關(guān)于Redis的原子操作

上面 Laravel 使用redis作為隊(duì)列存儲引擎時(shí), 在操作redis時(shí)使用到了 exec 執(zhí)行Lua腳本, 以確保原子性.

這里給不熟悉redis的同學(xué)簡單講一下.

以上面 Worker 啟動時(shí)的步驟1為例:

queues:default:delayed 有序集合中獲取可以處理的 "延遲任務(wù)", 并 rpushqueue:default隊(duì)列的尾部

具體的執(zhí)行語句:

redis> eval "Lua腳本" 2 queues:default:delayed queues:default 當(dāng)前時(shí)間戳

Lua 腳本內(nèi)容如下:

-- Get all of the jobs with an expired "score"...
local val = redis.call("zrangebyscore", KEYS[1], "-inf", ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto thedestination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
    redis.call("zremrangebyrank", KEYS[1], 0, #val - 1)

    for i = 1, #val, 100 do
        redis.call("rpush", KEYS[2], unpack(val, i, math.min(i+99, #val)))
    end
end

return val 

上述步驟首先從 queues:default:delayed 有序集合中獲取可以處理的 "延遲任務(wù)" 并 rpushqueue:default隊(duì)列的尾部.

那么如果不使用Lua腳本的話, 一般做法會是如下:

$jobs = $redis->zRangeByScore("queues:default:delayed", "-inf", time())
if (!empty($jobs)) {
    $redis->zRem("queues:default:delayed", ...$jobs);
    $redis->rPush("queues:default", ...$jobs);   
}

如果是單個Worker的話, 上述腳本不會有問題, 但是如果有多個Worker呢? 在php層面上執(zhí)行上述操作是會有并發(fā)問題的.

Worker_1 和 Worker_2 從 queues:default:delayed 隊(duì)列中獲取多個任務(wù)后, 執(zhí)行 rPush 語句會導(dǎo)致任務(wù)被執(zhí)行2次, 如果有多個 Worker 甚至?xí)?zhí)行更多次.

只要是有可能引起并發(fā)問題的情況, 那么就一定會發(fā)生.
以 分布式鎖 為例

鎖的兩大基本操作:

Lock

Unlock

Lock 操作

// 生成唯一的鎖id
$identifier = uniqid(php_uname("n") . "_", true);
// 僅在該key不存在時(shí)設(shè)置, 過期時(shí)間5秒
$result = $redis->set("lock_key", $identifier, ["NX", "EX" => 5]);

Unlock 操作

$script = <<evaluate($script, ["lock_key", $identifier], 1);

至于 Unlock 操作為什么要這么麻煩, 可以看一下以下兩種有問題的方案, 再想一想.

有問題的方案一

$redis->del("lock_key");

有問題的方案二

if ($redis->get("lock_key") == $identifier) {
    $redis->del("lock_key");
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/31416.html

相關(guān)文章

  • 分析Laravel隊(duì)列實(shí)現(xiàn)原理解決問題記錄

    摘要:在使用中的隊(duì)列時(shí),產(chǎn)生沖突干擾。文件中的配置部分至此,兩個項(xiàng)目的隊(duì)列沖突原因就找到了。隊(duì)列監(jiān)聽最后遇到問題,莫要病急亂投醫(yī)。從代碼入手,分析理解實(shí)現(xiàn)原理,找對點(diǎn),解決方法也許很簡單,。 問題 公司項(xiàng)目使用Laravel的開發(fā)的兩個項(xiàng)目在同一個測試服務(wù)器部署,公用同一個redis。在使用laravel中的隊(duì)列時(shí),產(chǎn)生沖突干擾。 查找問題原因 在laravel 隊(duì)列的操作類 Illumin...

    Corwien 評論0 收藏0
  • 基于 Redis驅(qū)動 Laravel 事件廣播

    摘要:一前言之前在項(xiàng)目中需要使用的事件廣播,而且項(xiàng)目打算使用作為驅(qū)動,但發(fā)現(xiàn)網(wǎng)上的資料大部分都是驅(qū)動的,只能自己摸索著搭建了一下服務(wù)。 一、前言 之前在項(xiàng)目中需要使用laravel的事件廣播,而且項(xiàng)目打算使用redis作為驅(qū)動,但發(fā)現(xiàn)網(wǎng)上的資料大部分都是Pusher驅(qū)動的,只能自己摸索著搭建了一下服務(wù)。現(xiàn)在將這個過程記錄一下,希望能幫到其他人。 二、項(xiàng)目的環(huán)境 事件廣播需要用到redis,n...

    fantix 評論0 收藏0
  • laravel 隊(duì)列

    摘要:如果任務(wù)沒有在規(guī)定時(shí)間內(nèi)完成,那么該有序集合的任務(wù)將會被重新放入隊(duì)列中。這兩個進(jìn)程操縱了三個隊(duì)列,其中一個,負(fù)責(zé)即時(shí)任務(wù),兩個,負(fù)責(zé)延時(shí)任務(wù)與待處理任務(wù)。如果任務(wù)執(zhí)行成功,就會刪除中的任務(wù),否則會被重新放入隊(duì)列中。 在實(shí)際的項(xiàng)目開發(fā)中,我們經(jīng)常會遇到需要輕量級隊(duì)列的情形,例如發(fā)短信、發(fā)郵件等,這些任務(wù)不足以使用 kafka、RabbitMQ 等重量級的消息隊(duì)列,但是又的確需要異步、重試...

    BDEEFE 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<