亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

RabbitMQ+PHP 教程二(Work Queues)

iKcamp / 2310人閱讀

摘要:平均每個消費(fèi)者將得到相同數(shù)量的消息。消息確認(rèn)完成任務(wù)可能需要幾秒鐘。為了確保消息不會丟失,支持消息確認(rèn)。沒有任何消息超時當(dāng)這個消費(fèi)者中止了,將會重新分配消息時。這是因為只是調(diào)度消息時,消息進(jìn)入隊列。

介紹

在上一個 Hello World 教程中,我們編寫了從指定隊列發(fā)送和接收消息的程序。在這篇文章中,我們將創(chuàng)建一個工作隊列,用于在多個工人(消費(fèi)者)之間分配耗時的任務(wù)。

工作隊列(又名任務(wù)隊列)背后的主要思想是避免立即執(zhí)行資源密集型任務(wù),必須等待它完成。相反,我們計劃稍后完成任務(wù)。我們將任務(wù)封裝為消息并將其發(fā)送到隊列中。后臺運(yùn)行的一個工作進(jìn)程將彈出任務(wù)并最終執(zhí)行該任務(wù)。當(dāng)你運(yùn)行許多工人(消費(fèi)者)時,任務(wù)將在他們之間分擔(dān)。

這個概念在Web應(yīng)用程序中尤其有用,因為在短HTTP請求中不可能處理復(fù)雜的任務(wù)。

先決條件

在本教程的前一部分,我們發(fā)送了一條包含“Hello World”的消息?,F(xiàn)在,我們將發(fā)送支持復(fù)雜任務(wù)的字符串。我們沒有一個真實環(huán)境的任務(wù),如圖像進(jìn)行調(diào)整或PDF文件的渲染,讓我們利用sleep()模擬真實環(huán)境的業(yè)務(wù)功能。我們將字符串中的點數(shù)作為其復(fù)雜度;每個點都將占“工作”的一秒鐘。例如,由Hello...描述的一個偽任務(wù)…需要三秒。

new_task.php

我們會稍微修改send.php代碼從我們先前的例子,允許任意的消息是從命令行發(fā)送。這一計劃將任務(wù)分配給我們的工作隊列,所以我們命名它 new_task.php

$data = implode(" ", array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, "", "hello");

echo " [x] Sent ", $data, "
";

我們的上一個版本的receive.php腳本也需要一些改變:它需要假第二工作在消息體中每一點。它會從隊列彈出消息和執(zhí)行任務(wù),所以讓我們把命名worker.php:

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
"; //根據(jù)"."數(shù)量個數(shù)獲取延遲時間,單位秒
  sleep(substr_count($msg->body, "."));  //模擬業(yè)務(wù)執(zhí)行時間延遲
  echo " [x] Done", "
";
};
$channel->basic_consume("hello", "", false, true, false, false, $callback);
單worker簡單運(yùn)行測試 消費(fèi)者
php worker.php
消息生產(chǎn)者
php new_task.php "A very hard task which takes two seconds.."
循環(huán)調(diào)度

一個使用任務(wù)隊列的優(yōu)點是容易并行工作的能力。如果我們積壓了大量的工作,我們可以增加更多的工人,這樣就可以輕松地規(guī)?;?。

首先,讓我們嘗試同時運(yùn)行兩worker.php腳本。他們都會從隊列中獲得消息,看看效果如何?讓我們看看。

你需要打開三個console命令。兩將運(yùn)行worker.php腳本。這些控制臺將是我們的兩個消費(fèi)者C1和C2。

消費(fèi)者1
php worker.php
消費(fèi)者2
php worker.php
消息生產(chǎn)者
php new_task.php msg1...

默認(rèn)情況下,RabbitMQ將會發(fā)送的每一條消息給下一個消費(fèi)者,在序列。平均每個消費(fèi)者將得到相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)輪詢。試著用三個或更多的工人。

消息確認(rèn)

完成任務(wù)可能需要幾秒鐘。你可能遇到如果一個消費(fèi)者開始一個長期的任務(wù),并且只完成了部分任務(wù),那么會發(fā)生什么?。我們目前的代碼,一旦RabbitMQ發(fā)送一個消息給客戶立即標(biāo)記為刪除。在這種情況下,如果您中止一個消費(fèi)者,我們將丟失它正在處理的消息。我們還將丟失發(fā)送給該消費(fèi)者所有的尚未處理的消息。

如果我們不想失去任何任務(wù)。如果一個消費(fèi)者意外中止了,我們希望把任務(wù)交給另一個消費(fèi)者。

為了確保消息不會丟失,RabbitMQ支持消息確認(rèn)。ACK(nowledgement)消費(fèi)者返回的結(jié)果告訴RabbitMQ有一條消息收到,你可以自由可控的刪除他

如果一個消費(fèi)者中止了(其通道關(guān)閉,連接被關(guān)閉,或TCP連接丟失)不發(fā)送ACK,RabbitMQ將會理解這個消息并沒有完全處理,將它重新加入隊列。如果有其他用戶同時在線,它就會快速地傳遞到另一個消費(fèi)者。這樣,即使意外中止了,也可以確保沒有丟失信息。

沒有任何消息超時;當(dāng)這個消費(fèi)者中止了,RabbitMQ將會重新分配消息時。即使處理消息花費(fèi)很長很長時間也很好。

消息確認(rèn)是默認(rèn)關(guān)閉??赏ㄟ^設(shè)置的第四個參數(shù)basic_consume設(shè)置為false(true意味著沒有ACK)和從消費(fèi)者發(fā)送合適的確認(rèn),一旦我們完成一個任務(wù)。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
";
  sleep(substr_count($msg->body, "."));
  echo " [x] Done", "
";
  $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
};

$channel->basic_consume("task_queue", "", false, false, false, false, $callback);

使用此代碼,我們可以確信,即使在處理消息時使用Ctrl + C殺死一名消費(fèi)者,也不會丟失任何東西。消費(fèi)者中止都未確認(rèn)的消息后很快會被重新分配。

忘了確認(rèn)(Forgotten acknowledgment)

丟失ACK確認(rèn)是一個常見的錯誤。這是一個容易犯的錯誤,但后果很嚴(yán)重。當(dāng)你的客戶退出,消息會被重新分配(這可能看起來像是隨機(jī)的分配),RabbitMQ將會消耗更多的內(nèi)存,它不會釋放任何延遲確認(rèn)消息。

為了調(diào)試這種錯誤,你可以使用rabbitmqctl打印messages_unacknowledged字段:

rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化(Message durability)

我們已經(jīng)學(xué)會了如何確保即使消費(fèi)者死了,任務(wù)也不會丟失。但是如果RabbitMQ服務(wù)器停止了,我們的任務(wù)仍然有可能會丟失。

當(dāng)RabbitMQ退出或崩潰了,會丟失隊列和消息除非你不要。要確保消息不會丟失,需要兩件事:我們需要將隊列和消息都標(biāo)記為持久的。

首先,我們需要確保RabbitMQ永遠(yuǎn)不會丟失隊列。為了做到這一點,我們需要聲明它是持久的。為此我們通過queue_declare作為第三參數(shù)為true:

$channel->queue_declare("hello", false, true, false, false);

雖然這個命令本身是正確的,但它在我們當(dāng)前的設(shè)置中不起作用。這是因為我們已經(jīng)定義了一個名為hello的隊列,該隊列不持久。RabbitMQ不允許你重新定義現(xiàn)有隊列用不同的參數(shù),將返回一個錯誤的任何程序,試圖這么做。但有一個快速的解決方法-讓我們聲明一個名稱不同的隊列,例如task_queue:

$channel->queue_declare("task_queue", false, true, false, false);

需要應(yīng)用到生產(chǎn)者和消費(fèi)者代碼中設(shè)置為true。

在這一點上,我們可以確保即使RabbitMQ重啟了,task_queue隊列不會丟失?,F(xiàn)在我們要標(biāo)記我們的消息持續(xù)通過設(shè)置delivery_mode = 2消息屬性,amqpmessage作為屬性數(shù)組的一部分。

$msg = new AMQPMessage($data, array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
關(guān)于消息持久性的說明(Note on message persistence)

將消息標(biāo)記為持久性不能完全保證消息不會丟失。雖然它告訴RabbitMQ保存信息到磁盤上,還有一個短的時間窗口時,RabbitMQ 已經(jīng)接受信息并沒有保存它。另外,RabbitMQ不做fsync(2)每一個消息--它可能只是保存到緩存并沒有真正寫入到磁盤。持久性保證不強(qiáng),但對于我們的簡單任務(wù)隊列來說已經(jīng)足夠了。如果你需要更強(qiáng)的保證,那么你可以使用消費(fèi)者確認(rèn)。

公平調(diào)度

您可能已經(jīng)注意到,調(diào)度仍然不完全按照我們的要求工作。例如,在一個有兩個消費(fèi)者的情況下,當(dāng)所有的奇數(shù)信息都很重,甚至很輕的消息,一個消費(fèi)者會一直忙,而另一個消費(fèi)者幾乎不做任何工作。嗯,RabbitMQ不知道發(fā)生了什么事,仍將均勻消息發(fā)送。

這是因為RabbitMQ只是調(diào)度消息時,消息進(jìn)入隊列。當(dāng)存在未確認(rèn)的消息時。它只是盲目的分發(fā)n-th條消息給n-th個消費(fèi)者。

為了改變這個分配方式,我們可以調(diào)用basic_qos方法,設(shè)置參數(shù)prefetch_count = 1。這告訴RabbitMQ不要在一個時間給一個消費(fèi)者多個消息?;蛘撸瑩Q句話說,在處理和確認(rèn)以前的消息之前,不要向消費(fèi)者發(fā)送新消息。相反,它將發(fā)送給下一個仍然不忙的消費(fèi)者。

$channel->basic_qos(null, 1, null);
關(guān)于隊列大小的注釋(Note about queue size)

如果所有的消費(fèi)者都很忙,你的隊列填滿了。你會想留意到這一點,也許增加更多的工人,或者有其他的策略。

源碼 new_task.php
channel();

$channel->queue_declare("task_queue", false, true, false, false);

$data = implode(" ", array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT)
                      );

$channel->basic_publish($msg, "", "task_queue");

echo " [x] Sent ", $data, "
";

$channel->close();
$connection->close();

?>
worker.php
channel();

$channel->queue_declare("task_queue", false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C", "
";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "
";
  sleep(substr_count($msg->body, "."));
  echo " [x] Done", "
";
  $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume("task_queue", "", false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

使用消息的確認(rèn)和預(yù)取,你可以設(shè)置一個工作隊列。耐久性的配置選項讓任務(wù)存在,即使RabbitMQ重啟。

學(xué)習(xí)如何向許多消費(fèi)者傳遞同樣的信息, 你可以閱讀下一章節(jié):RabbitMQ+PHP 教程三(Publish/Subscribe)。

翻譯來自 RabbitMQ - RabbitMQ tutorial - Work Queues

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/26023.html

相關(guān)文章

  • RabbitMQ+PHP 教程一(Hello World)

    摘要:在中間的框是一個隊列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊列匹配。 介紹 RabbitMQ是一個消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個郵局:當(dāng)你把郵件放在信箱里時,你可以肯定郵差先生最終會把郵件送到你的收件人那里。在這個比喻中,RabbitMQ就...

    silencezwm 評論0 收藏0
  • 【譯】RabbitMQ系列()-Work模式

    摘要:每個消費(fèi)者會得到平均數(shù)量的。為了確保不會丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒有發(fā)送,會認(rèn)為該消息沒有完整的執(zhí)行,會將該消息重新入隊。該消息會被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...

    lcodecorex 評論0 收藏0
  • RabbitMQ+PHP 消息隊列環(huán)境配置

    摘要:參考文檔依賴包安裝環(huán)境配置環(huán)境變量增加內(nèi)容保存退出,并刷新變量測試是否安裝成功安裝完成以后,執(zhí)行看是否能打開,用退出,注意后面的點號,那是的結(jié)束符。 參考文檔:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依賴包安裝 yum install ncurses-d...

    geekidentity 評論0 收藏0
  • RabbitMQ+PHP 教程三(Publish/Subscribe)

    摘要:在客戶端中,當(dāng)我們將隊列名稱作為空字符串提供時,我們創(chuàng)建一個帶有生成名稱的非持久隊列方法返回時,變量包含一個隨機(jī)生成的隊列名稱。交換和隊列之間的關(guān)系稱為綁定。 使用 php-amqplib 介紹 在前面的教程中,我們創(chuàng)建了一個工作隊列。工作隊列背后的假設(shè)是每個任務(wù)都交付給一個工作人員處理。在這一部分中,我們將做一些完全不同的事情——我們將向多個消費(fèi)者發(fā)送消息。此模式稱為發(fā)布/訂閱。 ...

    Neilyo 評論0 收藏0
  • [譯] RabbitMQ tutorials (2) ---- 'work queue&#

    摘要:這樣的消息分發(fā)機(jī)制稱作輪詢。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會被重新分發(fā)。忘記確認(rèn)這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經(jīng)聲明的隊列中收發(fā)...

    joyvw 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<