亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

剖析Laravel隊列系統(tǒng)--Worker

CollinPeng / 896人閱讀

摘要:一旦這一切完成,方法會運行在類屬性在命令構造后設置容器解析實例,在中我們設置了將使用的緩存驅動,我們也根據命令來決定我們調用什么方法。作業(yè)只在以上起效在上也無效處理作業(yè)方法調用觸發(fā)事件觸發(fā)事件。

譯文GitHub https://github.com/yuansir/diving-laravel-zh

原文鏈接https://divinglaravel.com/queue-system/workers

現在,我們知道了Laravel如何將作業(yè)推到不同的隊列中,讓我們來深入了解workers如何運作你的作業(yè)。 首先,我將workers定義為一個在后臺運行的簡單PHP進程,目的是從存儲空間中提取作業(yè)并針對多個配置選項運行它們。

php artisan queue:work

運行此命令將指示Laravel創(chuàng)建應用程序的一個實例并開始執(zhí)行作業(yè),這個實例將一直存活著,啟動Laravel應用程序的操作只在運行命令時發(fā)生一次,同一個實例將被用于執(zhí)行你的作業(yè),這意味著:

避免在每個作業(yè)上啟動整個應用程序來節(jié)省服務器資源。

在應用程序中所做的任何代碼更改后必須手動重啟worker。

你也可以這樣運行:

php artisan queue:work --once

這將啟動應用程序的一個實例,處理單個作業(yè),然后干掉腳本。

php artisan queue:listen

queue:listen 命令相當于無限循環(huán)地運行 queue:work --once 命令,這將導致以下問題:

每個循環(huán)都會啟動一個應用程序實例。

分配的worker將選擇一個工作并執(zhí)行。

worker進程將被干掉。

使用 queue:listen 確保為每個作業(yè)創(chuàng)建一個新的應用程序實例,這意味著代碼更改以后不必手動重啟worker,同時也意味著將消耗更多的服務器資源。

queue:work 命令

我們來看看 QueueConsoleWorkCommand 類的 handle() 方法,這是當你運行 php artisan queue:work 時會執(zhí)行的方法:

public function handle()
{
    if ($this->downForMaintenance() && $this->option("once")) {
        return $this->worker->sleep($this->option("sleep"));
    }

    $this->listenForEvents();

    $connection = $this->argument("connection")
                    ?: $this->laravel["config"]["queue.default"];

    $queue = $this->getQueue($connection);

    $this->runWorker(
        $connection, $queue
    );
}

首先,我們檢查應用程序是否處于維護模式,并使用 --once 選項,在這種情況下,我們希望腳本正常運行,因此我們不執(zhí)行任何作業(yè),我們只需要在完全殺死腳本前讓worker在一段時間內休眠。

QueueWorkersleep() 方法看起來像這樣:

public function sleep($seconds)
{
    sleep($seconds);
}
為什么我們不能在 handle() 方法中返回null來終止腳本?

如前所述, queue:listen 命令在循環(huán)中運行 WorkCommand

while (true) {
     // This process simply calls "php artisan queue:work --once"
    $this->runProcess($process, $options->memory);
}

如果應用程序處于維護模式,并且 WorkCommand 立即終止,這將導致循環(huán)結束,下一個在很短的時間內啟動,最好在這種情況下導致一些延遲,而不是通過創(chuàng)建我們不會真正使用的大量應用程序實例。

監(jiān)聽事件

handle() 方法里面我們調用 listenForEvents() 方法:

protected function listenForEvents()
{
    $this->laravel["events"]->listen(JobProcessing::class, function ($event) {
        $this->writeOutput($event->job, "starting");
    });

    $this->laravel["events"]->listen(JobProcessed::class, function ($event) {
        $this->writeOutput($event->job, "success");
    });

    $this->laravel["events"]->listen(JobFailed::class, function ($event) {
        $this->writeOutput($event->job, "failed");

        $this->logFailedJob($event);
    });
}

在這個方法中我們會監(jiān)聽幾個事件,這樣我們可以在每次作業(yè)處理中,處理完或處理失敗時向用戶打印一些信息。

記錄失敗作業(yè)

一旦作業(yè)失敗 logFailedJob() 方法會被調用

$this->laravel["queue.failer"]->log(
    $event->connectionName, $event->job->getQueue(),
    $event->job->getRawBody(), $event->exception
);

queue.failer 容器別名在 QueueQueueServiceProvider::registerFailedJobServices() 中注冊:

protected function registerFailedJobServices()
{
    $this->app->singleton("queue.failer", function () {
        $config = $this->app["config"]["queue.failed"];

        return isset($config["table"])
                    ? $this->databaseFailedJobProvider($config)
                    : new NullFailedJobProvider;
    });
}

/**
 * Create a new database failed job provider.
 *
 * @param  array  $config
 * @return IlluminateQueueFailedDatabaseFailedJobProvider
 */
protected function databaseFailedJobProvider($config)
{
    return new DatabaseFailedJobProvider(
        $this->app["db"], $config["database"], $config["table"]
    );
}

如果配置了 queue.failed ,則將使用數據庫隊列失敗,并將有關失敗作業(yè)的信息簡單地存儲在數據庫表中的:

$this->getTable()->insertGetId(compact(
    "connection", "queue", "payload", "exception", "failed_at"
));
運行worker

要運行worker,我們需要收集兩條信息:

worker的連接信息從作業(yè)中提取

worker找到作業(yè)的隊列

如果沒有使用 queue.default 配置定義的默認連接。您可以為 queue:work 命令提供 --connection=default 選項。

隊列也是一樣,您可以提供一個 --queue=emails 選項,或選擇連接配置中的 queue 選項。一旦這一切完成, WorkCommand::handle() 方法會運行 runWorker()

protected function runWorker($connection, $queue)
{
    $this->worker->setCache($this->laravel["cache"]->driver());

    return $this->worker->{$this->option("once") ? "runNextJob" : "daemon"}(
        $connection, $queue, $this->gatherWorkerOptions()
    );
}

在worker類屬性在命令構造后設置:

public function __construct(Worker $worker)
{
    parent::__construct();

    $this->worker = $worker;
}

容器解析 QueueWorker 實例,在runWorker()中我們設置了worker將使用的緩存驅動,我們也根據--once 命令來決定我們調用什么方法。

如果使用 --once 選項,我們只需調用 runNextJob 來運行下一個可用的作業(yè),然后腳本就會終止。 否則,我們將調用 daemon 方法來始終保持進程處理作業(yè)。

在開始工作時,我們使用 gatherWorkerOptions() 方法收集用戶給出的命令選項,我們稍后會提供這些選項,這個工具是 runNextJobdaemon 方法。

protected function gatherWorkerOptions()
{
    return new WorkerOptions(
        $this->option("delay"), $this->option("memory"),
        $this->option("timeout"), $this->option("sleep"),
        $this->option("tries"), $this->option("force")
    );
}
守護進程

讓我看看 Worker::daemon() 方法,這個方法的第一行調用了 Worker::daemon() 方法

protected function listenForSignals()
{
    if ($this->supportsAsyncSignals()) {
        pcntl_async_signals(true);

        pcntl_signal(SIGTERM, function () {
            $this->shouldQuit = true;
        });

        pcntl_signal(SIGUSR2, function () {
            $this->paused = true;
        });

        pcntl_signal(SIGCONT, function () {
            $this->paused = false;
        });
    }
}

這種方法使用PHP7.1的信號處理, supportsAsyncSignals() 方法檢查我們是否在PHP7.1上,并加載 pcntl 擴展名。

之后pcntl_async_signals() 被調用來啟用信號處理,然后我們?yōu)槎鄠€信號注冊處理程序:

當腳本被指示關閉時,會引發(fā)SIGTERM。

SIGUSR2是用戶定義的信號,Laravel用來表示腳本應該暫停。

當暫停的腳本繼續(xù)進行時,會引發(fā)SIGCONT。

這些信號從Process Monitor(如 Supervisor )發(fā)送并與我們的腳本進行通信。

Worker::daemon() 方法中的第二行讀取最后一個隊列重新啟動的時間戳,當我們調用queue:restart 時該值存儲在緩存中,稍后我們將檢查是否和上次重新啟動的時間戳不符合,來指示worker在之后多次重啟。

最后,該方法啟動一個循環(huán),在這個循環(huán)中,我們將完成其余獲取作業(yè)的worker,運行它們,并對worker進程執(zhí)行多個操作。

while (true) {
    if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
        $this->pauseWorker($options, $lastRestart);

        continue;
    }

    $job = $this->getNextJob(
        $this->manager->connection($connectionName), $queue
    );

    $this->registerTimeoutHandler($job, $options);

    if ($job) {
        $this->runJob($job, $connectionName, $options);
    } else {
        $this->sleep($options->sleep);
    }

    $this->stopIfNecessary($options, $lastRestart);
}
確定worker是否應該處理作業(yè)

調用 daemonShouldRun() 檢查以下情況:

應用程序不處于維護模式

Worker沒有暫停

沒有事件監(jiān)聽器阻止循環(huán)繼續(xù)

如果應用程序在維護模式下,worker使用--force選項仍然可以處理作業(yè):

php artisan queue:work --force

確定worker是否應該繼續(xù)的條件之一是:

$this->events->until(new EventsLooping($connectionName, $queue)) === false)

這行觸發(fā) QueueEventLooping 事件,并檢查是否有任何監(jiān)聽器在 handle() 方法中返回false,這種情況下你可以強制您的workers暫時停止處理作業(yè)。

如果worker應該暫停,則調用 pauseWorker() 方法:

protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
    $this->sleep($options->sleep > 0 ? $options->sleep : 1);

    $this->stopIfNecessary($options, $lastRestart);
}

sleep 方法并傳遞給控制臺命令的 --sleep 選項,這個方法調用

public function sleep($seconds)
{
    sleep($seconds);
}

腳本休眠了一段時間后,我們檢查worker是否應該在這種情況下退出并殺死腳本,稍后我們看一下stopIfNecessary 方法,以防腳本不能被殺死,我們只需調用 continue; 開始一個新的循環(huán):

if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
    $this->pauseWorker($options, $lastRestart);

    continue;
}
Retrieving 要運行的作業(yè)
$job = $this->getNextJob(
    $this->manager->connection($connectionName), $queue
);

getNextJob() 方法接受一個隊列連接的實例,我們從隊列中獲取作業(yè)

protected function getNextJob($connection, $queue)
{
    try {
        foreach (explode(",", $queue) as $queue) {
            if (! is_null($job = $connection->pop($queue))) {
                return $job;
            }
        }
    } catch (Exception $e) {
        $this->exceptions->report($e);

        $this->stopWorkerIfLostConnection($e);
    }
}

我們簡單地循環(huán)給定的隊列,使用選擇的隊列連接從存儲空間(數據庫,redis,sqs,...)獲取作業(yè)并返回該作業(yè)。

要從存儲中retrieve作業(yè),我們查詢滿足以下條件的最舊作業(yè):

推送到 queue ,我們試圖從中找到作業(yè)

沒有被其他worker reserved

可以在給定的時間內運行,有些作業(yè)在將來被推遲運行

我們也取到了很久以來被凍結的作業(yè)并重試

一旦我們找到符合這一標準的作業(yè),我們將這個作業(yè)標記為reserved,以便其他workers獲取到,我們還會增加作業(yè)監(jiān)控次數。

監(jiān)控作業(yè)超時

下一個作業(yè)被retrieved之后,我們調用 registerTimeoutHandler() 方法:

protected function registerTimeoutHandler($job, WorkerOptions $options)
{
    if ($this->supportsAsyncSignals()) {
        pcntl_signal(SIGALRM, function () {
            $this->kill(1);
        });the

        $timeout = $this->timeoutForJob($job, $options);

        pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0);
    }
}

再次,如果 pcntl 擴展被加載,我們將注冊一個信號處理程序干掉worker進程如果該作業(yè)超時的話,在配置了超時之后我們使用 pcntl_alarm() 來發(fā)送一個 SIGALRM 信號。

如果作業(yè)所花費的時間超過了超時值,處理程序將會終止該腳本,如果不是該作業(yè)將通過,并且下一個循環(huán)將設置一個新的報警覆蓋第一個報警,因為進程中可能存在單個報警。

作業(yè)只在PHP7.1以上起效,在window上也無效 ˉ_(ツ)_/ˉ

處理作業(yè)

runJob() 方法調用 process():

public function process($connectionName, $job, WorkerOptions $options)
{
    try {
        $this->raiseBeforeJobEvent($connectionName, $job);

        $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
            $connectionName, $job, (int) $options->maxTries
        );

        $job->fire();

        $this->raiseAfterJobEvent($connectionName, $job);
    } catch (Exception $e) {
        $this->handleJobException($connectionName, $job, $options, $e);
    }
}

raiseBeforeJobEvent() 觸發(fā) QueueEventsJobProcessing 事件, raiseAfterJobEvent() 觸發(fā) QueueEventsJobProcessed 事件。 markJobAsFailedIfAlreadyExceedsMaxAttempts() 檢查進程是否達到最大嘗試次數,并將該作業(yè)標記為失?。?/p>

protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
    $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

    if ($maxTries === 0 || $job->attempts() <= $maxTries) {
        return;
    }

    $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
        "A queued job has been attempted too many times. The job may have previously timed out."
    ));

    throw $e;
}

否則我們在作業(yè)對象上調用 fire() 方法來運行作業(yè)。

從哪里獲取作業(yè)對象

getNextJob() 方法返回一個 ContractsQueueJob 的實例,這取決于我們使用相應的Job實例的隊列驅動程序,例如如果數據庫隊列驅動則選擇 QueueJobsDatabaseJob 。

循環(huán)結束

在循環(huán)結束時,我們調用 stopIfNecessary() 來檢查在下一個循環(huán)開始之前是否應該停止進程:

protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
    if ($this->shouldQuit) {
        $this->kill();
    }

    if ($this->memoryExceeded($options->memory)) {
        $this->stop(12);
    } elseif ($this->queueShouldRestart($lastRestart)) {
        $this->stop();
    }
}

shouldQuit 屬性在兩種情況下設置,首先listenForSignals() 內部的作為 SIGTERM 信號處理程序,其次在 stopWorkerIfLostConnection()

protected function stopWorkerIfLostConnection($e)
{
    if ($this->causedByLostConnection($e)) {
        $this->shouldQuit = true;
    }
}

在retrieving和處理作業(yè)時,會在幾個try ... catch語句中調用此方法,以確保worker應該處于被干掉的狀態(tài),以便我們的Process Control可能會啟動一個新的數據庫連接。

causedByLostConnection() 方法可以在 DatabaseDetectsLostConnections trait中找到。
memoryExceeded() 檢查內存使用情況是否超過當前設置的內存限制,您可以使用 --memory 選項設置限制。

轉載請注明:?轉載自Ryan是菜鳥 | LNMP技術棧筆記

如果覺得本篇文章對您十分有益,何不 打賞一下

本文鏈接地址:?剖析Laravel隊列系統(tǒng)--Worker

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://www.ezyhdfw.cn/yun/23262.html

相關文章

  • 剖析Laravel隊列系統(tǒng)--準備隊列作業(yè)

    摘要:原文鏈接我們推送到隊列的每個作業(yè)都存儲在按執(zhí)行順序排序的某些存儲空間中,該存儲位置可以是數據庫,存儲或像這樣的第三方服務。這個數字從開始,在每次運行作業(yè)時不斷增加。 原文鏈接https://divinglaravel.com/queue-system/preparing-jobs-for-queue Every job we push to queue is stored in som...

    marek 評論0 收藏0
  • 剖析Laravel隊列系統(tǒng)--推送作業(yè)到隊列

    摘要:有幾種有用的方法可以使用將作業(yè)推送到特定的隊列在給定的秒數之后推送作業(yè)延遲后將作業(yè)推送到特定的隊列推送多個作業(yè)推送特定隊列上的多個作業(yè)調用這些方法之后,所選擇的隊列驅動會將給定的信息存儲在存儲空間中,供按需獲取。 原文鏈接https://divinglaravel.com/queue-system/pushing-jobs-to-queue There are several ways...

    maochunguang 評論0 收藏0
  • 剖析Laravel隊列系統(tǒng)--初探

    摘要:配有內置的隊列系統(tǒng),可幫助您在后臺運行任務,并通過簡單的來配置系統(tǒng)在不同情況下起作用。您可以在中管理隊列配置,默認情況下它有使用不同隊列驅動的幾個連接,您可以看到項目中可以有多個隊列連接,也可以使用多個隊列驅動程序。 原文鏈接https://divinglaravel.com/queue-system/before-the-dive Laravel receives a request...

    pubdreamcc 評論0 收藏0
  • Swoft 源碼剖析 - 連接池

    摘要:基于擴展實現真正的數據庫連接池這種方案中,項目占用的連接數僅僅為。一種是連接暫時不再使用,其占用狀態(tài)解除,可以從使用者手中交回到空閑隊列中這種我們稱為連接的歸隊。源碼剖析系列目錄 作者:bromine鏈接:https://www.jianshu.com/p/1a7...來源:簡書著作權歸作者所有,本文已獲得作者授權轉載,并對原文進行了重新的排版。Swoft Github: https:...

    rozbo 評論0 收藏0
  • Laravel5.2隊列驅動expire參數設置帶來的重復執(zhí)行問題 數據庫驅動

    摘要:已經取消了參數,都用來執(zhí)行。取數據的過程事物處理已經打開。取得符合條件的隊列后程序會更新該條數據,并且更新完后即。 connections => [ .... database => [ driver => database, table => jobs, queue => defaul...

    ysl_unh 評論0 收藏0

發(fā)表評論

0條評論

CollinPeng

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<