摘要:在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),保持代表的消費(fèi)。本教程介紹,這是一個(gè)開放的通用的協(xié)議消息。我們將在本教程中使用,解決依賴管理。發(fā)送者將連接到,發(fā)送一條消息,然后退出。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。
介紹
RabbitMQ是一個(gè)消息代理器:它接受和轉(zhuǎn)發(fā)消息。你可以把它當(dāng)作一個(gè)郵局:當(dāng)你把郵件放在信箱里時(shí),你可以肯定郵差先生最終會(huì)把郵件送到你的收件人那里。在這個(gè)比喻中,RabbitMQ就是這里的郵箱,郵局和郵差。
RabbitMQ和郵局之間的主要區(qū)別是,它不處理紙張,而是接受、存儲(chǔ)和轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)?消息。
RabbitMQ,和一般的消息傳遞,使用專業(yè)術(shù)語(yǔ)。
生產(chǎn)者的工作就是發(fā)送消息。發(fā)送消息的程序是生產(chǎn)者:
隊(duì)列類比一個(gè)郵箱,存在于RabbitMQ, 然而信息流通過(guò)RabbitMQ和您的應(yīng)用程序,他們只能存儲(chǔ)在一個(gè)隊(duì)列。隊(duì)列只受主機(jī)內(nèi)存和磁盤限制的約束,它本質(zhì)上是一個(gè)很大的消息緩沖區(qū)。會(huì)有許多生產(chǎn)者可以發(fā)送到一個(gè)隊(duì)列的消息,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。這就是我們?nèi)绾伪硎娟?duì)列的方式:
消費(fèi)者和生產(chǎn)者有著相似的意義. 消費(fèi)者無(wú)非就是等待消息然后處理的程序:
請(qǐng)注意,生產(chǎn)者、消費(fèi)者和代理不必同一主機(jī)上;事實(shí)上,在大多數(shù)應(yīng)用程序中它們沒有這樣做。
"Hello World"(使用PHP amqplib客戶端)
在本教程的這一部分中,我們將用PHP編寫兩個(gè)程序;一個(gè)生產(chǎn)者發(fā)送一條消息,一個(gè)用戶接收消息并將它們打印出來(lái)。我們會(huì)PHP amqplib API的忽略一些細(xì)節(jié),集中在這個(gè)非常簡(jiǎn)單的事情剛剛開始。這是一個(gè)“Hello World”的消息傳遞。
在下圖中,“p”是我們的生產(chǎn)商,“C”是我們的消費(fèi)者。在中間的框是一個(gè)隊(duì)列的消息緩沖區(qū),RabbitMQ保持代表的消費(fèi)。
PHP amqplib客戶端庫(kù)RabbitMQ有很多協(xié)議。本教程介紹AMQP 0-9-1,這是一個(gè)開放的、通用的協(xié)議消息。有許多不同的語(yǔ)言RabbitMQ一批客戶。我們將在本教程中使用PHP amqplib,composer解決依賴管理。
添加composer.json:
{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
composer install # 或者 直接運(yùn)行包引入 composer require php-amqplib/php-amqplib現(xiàn)在我們可以開始我們的hello world
生產(chǎn)者(消息發(fā)送方)
我們命令我們的消息發(fā)布者(發(fā)送者)send.php和消息接收receive.php。發(fā)送者將連接到RabbitMQ,發(fā)送一條消息,然后退出。
require_once __DIR__ . "/vendor/autoload.php"; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage;
現(xiàn)在我們能創(chuàng)建一個(gè)連接服務(wù)器的Connection:
$connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest"); $channel = $connection->channel();
該連接抽象套接字(socket)連接,并為我們負(fù)責(zé)協(xié)議版本協(xié)商和認(rèn)證等。這里,我們連接到一個(gè)rabbitmq代理器在本地機(jī)器上-使用localhost。如果我們想在不同的機(jī)器上連接到一個(gè)代理,我們只需在這里指定它的名稱或IP地址。
接下來(lái),我們創(chuàng)建一個(gè)通道,這是處理事情的大部分API的地方。
發(fā)送消息前,我們必須聲明一個(gè)隊(duì)列為我們發(fā)送做準(zhǔn)備;然后我們可以向隊(duì)列發(fā)布消息:
$channel->queue_declare("hello", false, false, false, false); $msg = new AMQPMessage("Hello World!"); $channel->basic_publish($msg, "", "hello"); echo " [x] Sent "Hello World!" ";
聲明隊(duì)列是冪等的(原句:Declaring a queue is idempotent,這里的idempotent不知道是什么意思) - 只有在它不存在時(shí)才會(huì)創(chuàng)建隊(duì)列。消息內(nèi)容是一個(gè)字節(jié)數(shù)組,因此您可以在那里編碼用你喜歡的方式。
最后,我們關(guān)閉通道和連接;
$channel->close(); $connection->close();
上面我們完成了send.php.
接下來(lái)我們完成消費(fèi)方的代碼
消費(fèi)者(接收方,任務(wù)處理方)消費(fèi)者從RabbitMQ接收推來(lái)的消息,我們會(huì)保持運(yùn)行監(jiān)聽消息并打印出來(lái)。
引入lib
require_once __DIR__ . "/vendor/autoload.php"; use PhpAmqpLibConnectionAMQPStreamConnection;
設(shè)置與發(fā)布程序相同;我們打開一個(gè)連接和一個(gè)通道,并聲明將要消耗的隊(duì)列。注意,這與發(fā)送發(fā)布的隊(duì)列匹配。
$connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest"); $channel = $connection->channel(); $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " ";
注意,我們也在這里聲明隊(duì)列。因?yàn)槲覀兛赡茉诎l(fā)布之前啟動(dòng)消費(fèi)者,我們希望在我們嘗試從它那里消費(fèi)消息之前確定隊(duì)列的存在。
我們將告訴服務(wù)器從隊(duì)列中發(fā)送消息。我們將定義一個(gè)PHP可調(diào)用,它將接收服務(wù)器發(fā)送的消息。請(qǐng)記住,消息是從服務(wù)器異步發(fā)送到客戶機(jī)的。
$callback = function($msg) { echo " [x] Received ", $msg->body, " "; }; $channel->basic_consume("hello", "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
當(dāng)調(diào)用basic_consume,我們的代碼會(huì)阻塞。當(dāng)我們收到消息時(shí),我們的回調(diào)函數(shù)將通過(guò)接收到返回的消息傳遞。
以上是我們r(jià)eceive.php的代碼
運(yùn)行測(cè)試 運(yùn)行消費(fèi)者php receive.php運(yùn)行消息發(fā)送方
php send.php列出隊(duì)列
rabbitmqctl list_queues完整源碼(調(diào)整過(guò)) config.php
[ "path" => dirname(dirname(__DIR__)) . "/vendor" ], "rabbitmq" => [ "host" => "127.0.0.1", "port" => "5672", "login" => "qkl", "password" => "123456", "vhost" => "/" ] ]; ?>receive.php
channel(); $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " "; $callback = function($msg) { echo " [x] Received ", $msg->body, " "; }; $channel->basic_consume("hello", "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>send.php
channel(); //發(fā)送方其實(shí)不需要設(shè)置隊(duì)列, 不過(guò)對(duì)于持久化有關(guān),建議執(zhí)行該行 $channel->queue_declare("hello", false, false, false, false); $msg = new AMQPMessage("Hello World!"); $channel->basic_publish($msg, "", "hello"); echo " [x] Sent "Hello World!" "; $channel->close(); $connection->close(); ?>
了解如何構(gòu)建一個(gè)簡(jiǎn)單的工作隊(duì)列, 你可以閱讀下一章節(jié): RabbitMQ+PHP 教程二(Work Queues)
翻譯來(lái)自 RabbitMQ - RabbitMQ tutorial - "Hello World!"
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/26025.html
摘要:基礎(chǔ)教程注本文是對(duì)眾多博客的學(xué)習(xí)和總結(jié),可能存在理解錯(cuò)誤。請(qǐng)帶著懷疑的眼光,同時(shí)如果有錯(cuò)誤希望能指出。安裝庫(kù)這里我們首先將消息推入隊(duì)列,然后消費(fèi)者從隊(duì)列中去除消息進(jìn)行消費(fèi)。 RabbitMQ 基礎(chǔ)教程(1) - Hello World 注:本文是對(duì)眾多博客的學(xué)習(xí)和總結(jié),可能存在理解錯(cuò)誤。請(qǐng)帶著懷疑的眼光,同時(shí)如果有錯(cuò)誤希望能指出。 如果你喜歡我的文章,可以關(guān)注我的私人博客:http:...
摘要:平均每個(gè)消費(fèi)者將得到相同數(shù)量的消息。消息確認(rèn)完成任務(wù)可能需要幾秒鐘。為了確保消息不會(huì)丟失,支持消息確認(rèn)。沒有任何消息超時(shí)當(dāng)這個(gè)消費(fèi)者中止了,將會(huì)重新分配消息時(shí)。這是因?yàn)橹皇钦{(diào)度消息時(shí),消息進(jìn)入隊(duì)列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個(gè) Hello World 教程中,我們編寫了從指定隊(duì)列發(fā)送...
摘要:進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為對(duì)頭。還有個(gè)專業(yè)術(shù)語(yǔ)要了解下生產(chǎn)者發(fā)送消息的應(yīng)用程序被稱為生產(chǎn)者。消費(fèi)者接收消息的應(yīng)用程序被稱為消費(fèi)者。參考鏈接下安裝及操作常用命令譯實(shí)戰(zhàn)教程一你是否聽說(shuō)過(guò)或者使用過(guò)隊(duì)列? 你是否聽說(shuō)過(guò)或者使用過(guò)消息隊(duì)列? 你是否聽說(shuō)過(guò)或者使用過(guò)RabbitMQ? 提到這幾個(gè)詞,用過(guò)的人,也許覺得很簡(jiǎn)單,沒用過(guò)的人,也許覺得很復(fù)雜,至少在我沒使用消息隊(duì)列之前,聽...
摘要:每當(dāng)我們收到一條消息,這個(gè)回調(diào)函數(shù)就被皮卡庫(kù)調(diào)用。接下來(lái),我們需要告訴這個(gè)特定的回調(diào)函數(shù)應(yīng)該從我們的隊(duì)列接收消息為了讓這個(gè)命令成功,我們必須確保我們想要訂閱的隊(duì)列存在。生產(chǎn)者計(jì)劃將在每次運(yùn)行后停止歡呼我們能夠通過(guò)發(fā)送我們的第一條消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 介紹 RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。你...
摘要:我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。為了確保消息永不丟失,支持消息確認(rèn)。沒有任何消息超時(shí)當(dāng)消費(fèi)者死亡時(shí),將重新傳遞消息。發(fā)生這種情況是因?yàn)橹辉谙⑦M(jìn)入隊(duì)列時(shí)調(diào)度消息。這告訴一次不要向工作人員發(fā)送多個(gè)消息。 源碼:https://github.com/ltoddy/rabbitmq-tutorial 工作隊(duì)列 showImg(https://segmentfault.com/img/r...
閱讀 1212·2021-11-25 09:43
閱讀 1735·2021-09-13 10:25
閱讀 2695·2021-09-09 11:38
閱讀 3573·2021-09-07 10:14
閱讀 1796·2019-08-30 15:52
閱讀 704·2019-08-30 15:44
閱讀 3661·2019-08-29 13:23
閱讀 2042·2019-08-26 13:33