摘要:消費(fèi)者開發(fā)本例我們使用的多進(jìn)程開發(fā)工具來完成這個(gè)需求,通常使用常駐進(jìn)程來處理隊(duì)列的消費(fèi),所以我們使用的類型,模式。中進(jìn)程負(fù)責(zé)執(zhí)行郵件發(fā)送任務(wù)。此時(shí)終端將打印成功收到測(cè)試郵件官網(wǎng)
注意:這個(gè)是 MixPHP V1 的范例
郵件發(fā)送是很常見的需求,由于發(fā)送郵件的操作一般是比較耗時(shí)的,所以我們一般采用異步處理來提升用戶體驗(yàn),而異步通常我們使用消息隊(duì)列來實(shí)現(xiàn)。
傳統(tǒng) MVC 框架由于缺少多進(jìn)程開發(fā)能力,通常是采用同一個(gè)腳本執(zhí)行多次,產(chǎn)生多個(gè)進(jìn)程的方式,mixphp 封裝了 TaskExecutor 專用于多進(jìn)程開發(fā),用戶能非常簡(jiǎn)單的開發(fā)出功能完善的高可用多進(jìn)程應(yīng)用。
下面演示一個(gè)異步郵件發(fā)送系統(tǒng)的開發(fā)過程,涉及知識(shí)點(diǎn):
異步
消息隊(duì)列
多進(jìn)程
守護(hù)進(jìn)程
如何使用消息隊(duì)列實(shí)現(xiàn)異步PHP 使用消息隊(duì)列通常是使用中間件來實(shí)現(xiàn),常用的消息中間件有:
redis
rabbitmq
kafka
本次我們選用 redis 來實(shí)現(xiàn)異步郵件發(fā)送,redis 的數(shù)據(jù)類型中有一個(gè) list 類型,可實(shí)現(xiàn)消息隊(duì)列,使用以下命令:
// 入列 $redis->lpush($key, $data); // 出列 $data = $redis->rpop($key); // 阻塞出列 $data = $redis->brpop($key, 10);架構(gòu)設(shè)計(jì)
本實(shí)例由傳統(tǒng) MVC 框架投遞郵件發(fā)送需求,MixPHP 多進(jìn)程執(zhí)行發(fā)送任務(wù)。
郵件發(fā)送庫(kù)選型以往我們通常使用框架提供的郵件發(fā)送庫(kù),或者網(wǎng)上下載別的用戶分享的庫(kù),composer 出現(xiàn)后,https://packagist.org/ 上有大量?jī)?yōu)質(zhì)的庫(kù),我們只需選擇一個(gè)最好的即可,本例選擇 swiftmailer。
由于發(fā)送任務(wù)是由 MixPHP 執(zhí)行,所以 swiftmailer 是安裝在 MixPHP 項(xiàng)目中,在項(xiàng)目根目錄中執(zhí)行以下命令安裝:
composer require swiftmailer/swiftmailer生產(chǎn)者開發(fā)
在郵件發(fā)送這個(gè)需求中生產(chǎn)者是指投遞發(fā)送任務(wù)的一方,這一方通常是一個(gè)接口或網(wǎng)頁,這個(gè)部分并不一定需 mixphp 開發(fā),TP、CI、YII 這些都可以,只需在接口或網(wǎng)頁中把任務(wù)信息投遞到消息隊(duì)列中即可。
在傳統(tǒng) MVC 框架的控制器中增加如下代碼:
通??蚣苤惺褂?redis 會(huì)安裝一個(gè)類庫(kù)來使用,本例使用原生代碼,便于理解。
// 連接 $redis = new Redis(); if (!$redis->connect("127.0.0.1", 6379)) { throw new Exception("Redis Connect Failure"); } $redis->auth(""); $redis->select(0); // 投遞任務(wù) $data = [ "to" => ["***@qq.com" => "A name"], "body" => "Here is the message itself", "subject" => "The title content", ]; $redis->lpush("queue:email", serialize($data));
通常異步開發(fā)中,投遞完成后就會(huì)立即響應(yīng)一個(gè)消息給用戶,當(dāng)然此時(shí)該任務(wù)并沒有執(zhí)行。
消費(fèi)者開發(fā)本例我們使用 MixPHP 的多進(jìn)程開發(fā)工具 TaskExecutor 來完成這個(gè)需求,通常使用常駐進(jìn)程來處理隊(duì)列的消費(fèi),所以我們使用 TaskExecutor 的 TYPE_DAEMON 類型,MODE_PUSH 模式。
TaskExecutor 的 MODE_PUSH 模式有二種進(jìn)程:
左進(jìn)程:負(fù)責(zé)從消息隊(duì)列取出任務(wù)數(shù)據(jù),投放給中進(jìn)程。
中進(jìn)程:負(fù)責(zé)執(zhí)行郵件發(fā)送任務(wù)。
PushCommand.php 代碼如下:
*/ class PushCommand extends BaseCommand { // 配置信息 const HOST = "smtpdm.aliyun.com"; const PORT = 465; const SECURITY = "ssl"; const USERNAME = "****@email.***.com"; const PASSWORD = "****"; // 初始化事件 public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 獲取程序名稱 $this->programName = Input::getCommandName(); // 設(shè)置pidfile $this->pidFile = "/var/run/{$this->programName}.pid"; } /** * 獲取服務(wù) * @return TaskExecutor */ public function getTaskService() { return create_object( [ // 類路徑 "class" => "mix askTaskExecutor", // 服務(wù)名稱 "name" => "mix-daemon: {$this->programName}", // 執(zhí)行類型 "type" => mix askTaskExecutor::TYPE_DAEMON, // 執(zhí)行模式 "mode" => mix askTaskExecutor::MODE_PUSH, // 左進(jìn)程數(shù) "leftProcess" => 1, // 中進(jìn)程數(shù) "centerProcess" => 5, // 任務(wù)超時(shí)時(shí)間 (秒) "timeout" => 5, ] ); } // 啟動(dòng) public function actionStart() { // 預(yù)處理 if (!parent::actionStart()) { return ExitCode::UNSPECIFIED_ERROR; } // 啟動(dòng)服務(wù) $service = $this->getTaskService(); $service->on("LeftStart", [$this, "onLeftStart"]); $service->on("CenterStart", [$this, "onCenterStart"]); $service->start(); // 返回退出碼 return ExitCode::OK; } // 左進(jìn)程啟動(dòng)事件回調(diào)函數(shù) public function onLeftStart(LeftProcess $worker) { try { // 模型內(nèi)使用長(zhǎng)連接版本的數(shù)據(jù)庫(kù)組件,這樣組件會(huì)自動(dòng)幫你維護(hù)連接不斷線 $queueModel = Redis::getInstance(); // 保持任務(wù)執(zhí)行狀態(tài),循環(huán)結(jié)束后當(dāng)前進(jìn)程會(huì)退出,主進(jìn)程會(huì)重啟一個(gè)新進(jìn)程繼續(xù)執(zhí)行任務(wù),這樣做是為了避免長(zhǎng)時(shí)間執(zhí)行內(nèi)存溢出 for ($j = 0; $j < 16000; $j++) { // 從消息隊(duì)列中間件阻塞獲取一條消息 $data = $queueModel->brpop("queue:email", 10); if (empty($data)) { continue; } list(, $data) = $data; // 將消息推送給中進(jìn)程去處理,push有長(zhǎng)度限制 (https://wiki.swoole.com/wiki/page/290.html) $worker->push($data, false); } } catch (Exception $e) { // 休息一會(huì),避免 CPU 出現(xiàn) 100% sleep(1); // 拋出錯(cuò)誤 throw $e; } } // 中進(jìn)程啟動(dòng)事件回調(diào)函數(shù) public function onCenterStart(CenterProcess $worker) { // 保持任務(wù)執(zhí)行狀態(tài),循環(huán)結(jié)束后當(dāng)前進(jìn)程會(huì)退出,主進(jìn)程會(huì)重啟一個(gè)新進(jìn)程繼續(xù)執(zhí)行任務(wù),這樣做是為了避免長(zhǎng)時(shí)間執(zhí)行內(nèi)存溢出 for ($j = 0; $j < 16000; $j++) { // 從進(jìn)程消息隊(duì)列中搶占一條消息 $data = $worker->pop(); if (empty($data)) { continue; } // 處理消息 try { // 處理消息,比如:發(fā)送短信、發(fā)送郵件、微信推送 var_dump($data); $ret = self::sendEmail($data); var_dump($ret); } catch (Exception $e) { // 回退數(shù)據(jù)到消息隊(duì)列 $worker->rollback($data); // 休息一會(huì),避免 CPU 出現(xiàn) 100% sleep(1); // 拋出錯(cuò)誤 throw $e; } } } // 發(fā)送郵件 public static function sendEmail($data) { // Create the Transport $transport = (new Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) ->setUsername(self::USERNAME) ->setPassword(self::PASSWORD); // Create the Mailer using your created Transport $mailer = new Swift_Mailer($transport); // Create a message $message = (new Swift_Message($data["subject"])) ->setFrom([self::USERNAME => "**網(wǎng)"]) ->setTo($data["to"]) ->setBody($data["body"]); // Send the message $result = $mailer->send($message); return $result; } }測(cè)試
在 shell 中啟動(dòng) push 常駐程序。
[root@localhost bin]# ./mix-daemon push start mix-daemon "push" start successed.
調(diào)用接口往消息隊(duì)列投放任務(wù)。
此時(shí) shell 終端將打?。?/p>
成功收到測(cè)試郵件:
MixPHPGitHub: https://github.com/mixstart/m...
官網(wǎng):http://www.mixphp.cn/
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/28918.html
摘要:消費(fèi)者開發(fā)使用本例時(shí),請(qǐng)確保你使用的編譯時(shí)開啟了本例我們采用的守護(hù)程序協(xié)程池來完成一個(gè)超高性能的郵件發(fā)送程序。 去年 Mix PHP V1 發(fā)布時(shí),我寫了一個(gè)多進(jìn)程的郵件發(fā)送實(shí)例: 使用 mixphp 打造多進(jìn)程異步郵件發(fā)送,今年 Mix PHP V2 發(fā)布,全面的協(xié)程支持讓我們可以使用一個(gè)進(jìn)程就可達(dá)到之前多個(gè)進(jìn)程都無法達(dá)到的更高 IO 性能,所以今天重寫一個(gè)協(xié)程池版本的郵件發(fā)送實(shí)例。...
摘要:異步隊(duì)列消費(fèi)者開發(fā)只提供了模式下運(yùn)行控制器方法,并未提供主進(jìn)程多子進(jìn)程的進(jìn)程模型,并未提供多線程處理。多線程異步隊(duì)列服務(wù)只需寫好控制器方法,然后在配置文件中配置下路由命名空間進(jìn)程線程數(shù)量,就可在模式中啟動(dòng)多進(jìn)程多線程模型的異步隊(duì)列處理程序。 最近業(yè)余時(shí)間一直在開發(fā)ExpressPHP的第二個(gè)版本 MixPHP,今天下班想起之前一個(gè)面試官的問題:你為什么還要再造一個(gè)輪子呢?仔細(xì)回想,第一...
摘要:如何使用優(yōu)化高并發(fā)場(chǎng)景寫庫(kù)或者耗時(shí)計(jì)算在的接口中使用消息隊(duì)列,把要入庫(kù)的數(shù)據(jù)寫入的類型中。高容錯(cuò)子進(jìn)程異常奔潰時(shí),主進(jìn)程將重建子進(jìn)程。高性能多進(jìn)程運(yùn)行,充分利用多個(gè)并行計(jì)算,性能強(qiáng)勁。 經(jīng)常在群里聽到一些朋友問:TP 的項(xiàng)目怎么遷移到 mixphp 來處理高并發(fā),我通常都是回復(fù)需要重寫,可是一個(gè)開發(fā)很久的 TP 項(xiàng)目,代碼量巨大,又怎么可能會(huì)花大量時(shí)間成本來重寫呢? 那么為何我們不嘗試...
摘要:框架最新源代碼行數(shù)行,因此可以很容易的改造它,成為你們公司的專屬框架。也不同于其他基于的微服務(wù)框架,只聚焦于微服務(wù)治理,定位于開發(fā)的更多領(lǐng)域,覆蓋從初創(chuàng)到億元級(jí)體量的技術(shù)訴求。的授權(quán)全靠用戶自愿購(gòu)買,詳情 MixPHP是什么 MixPHP 是秉承 普及 PHP 常駐內(nèi)存型解決方案,促進(jìn) PHP 往更后端發(fā)展 的理念而創(chuàng)造,采用 Swoole 擴(kuò)展作為底層引擎,圍繞常駐內(nèi)存的方式而設(shè)計(jì),...
摘要:在多種環(huán)境中遷移,代碼無需修改,是無縫遷移的。由于大部分用戶開發(fā)是在中進(jìn)行,因此開發(fā)階段我們推薦使用部署方案,因?yàn)楦?jiǎn)單快速,下面整體演示一下的環(huán)境搭建。安裝解壓至指定安裝目錄。先不要啟動(dòng),這會(huì)啟動(dòng)會(huì)報(bào)錯(cuò),沒加環(huán)境變量。 MixPHP 是一款基于 Swoole 的常駐內(nèi)存型 PHP 高性能框架。 MixPHP 同時(shí)支持多種環(huán)境中執(zhí)行: Nginx + mix-httpd (使用到 S...
閱讀 2972·2021-09-22 15:20
閱讀 3105·2021-09-22 15:19
閱讀 3614·2021-09-22 15:15
閱讀 2623·2021-09-08 09:35
閱讀 2454·2019-08-30 15:44
閱讀 3098·2019-08-30 10:50
閱讀 3916·2019-08-29 16:25
閱讀 1675·2019-08-26 13:55