摘要:作為定時(shí)任務(wù)的執(zhí)行者,通過每喚醒自身一次,然后把執(zhí)行表遍歷一次,挑選當(dāng)下需要執(zhí)行的任務(wù),通過投遞出去并更新該任務(wù)執(zhí)行表中的狀態(tài)。
作者:bromine
鏈接:https://www.jianshu.com/p/b44...
來源:簡書
著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。
Swoft Github: https://github.com/swoft-clou...
Swoft源碼剖析系列目錄:https://segmentfault.com/a/11...前言
Swoft的任務(wù)功能基于Swoole的Task機(jī)制,或者說Swoft的Task機(jī)制本質(zhì)就是對(duì)Swoole的Task機(jī)制的封裝和加強(qiáng)。
任務(wù)投遞//SwoftTaskTask.php class Task { /** * Deliver coroutine or async task * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * * @return bool|array * @throws TaskException */ public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3) { $data = TaskHelper::pack($taskName, $methodName, $params, $type); if(!App::isWorkerStatus() && !App::isCoContext()){ return self::deliverByQueue($data);//見下文Command章節(jié) } if(!App::isWorkerStatus() && App::isCoContext()){ throw new TaskException("Please deliver task by http!"); } $server = App::$server->getServer(); // Delier coroutine task if ($type == self::TYPE_CO) { $tasks[0] = $data; $prifleKey = "task" . "." . $taskName . "." . $methodName; App::profileStart($prifleKey); $result = $server->taskCo($tasks, $timeout); App::profileEnd($prifleKey); return $result; } // Deliver async task return $server->task($data); } }
任務(wù)投遞Task::deliver()將調(diào)用參數(shù)打包后根據(jù)$type參數(shù)通過Swoole的$server->taskCo()或$server->task()接口投遞到Task進(jìn)程。
Task本身始終是同步執(zhí)行的,$type僅僅影響投遞這一操作的行為,Task::TYPE_ASYNC對(duì)應(yīng)的$server->task()是異步投遞,Task::deliver()調(diào)用后馬上返回;Task::TYPE_CO對(duì)應(yīng)的$server->taskCo()是協(xié)程投遞,投遞后讓出協(xié)程控制,任務(wù)完成或執(zhí)行超時(shí)后Task::deliver()才從協(xié)程返回。
//SwoftTaskBootstrapListenersTaskEventListener /** * The listener of swoole task * @SwooleListener({ * SwooleEvent::ON_TASK, * SwooleEvent::ON_FINISH, * }) */ class TaskEventListener implements TaskInterface, FinishInterface { /** * @param SwooleServer $server * @param int $taskId * @param int $workerId * @param mixed $data * @return mixed * @throws InvalidArgumentException */ public function onTask(Server $server, int $taskId, int $workerId, $data) { try { /* @var TaskExecutor $taskExecutor*/ $taskExecutor = App::getBean(TaskExecutor::class); $result = $taskExecutor->run($data); } catch (Throwable $throwable) { App::error(sprintf("TaskExecutor->run %s file=%s line=%d ", $throwable->getMessage(), $throwable->getFile(), $throwable->getLine())); $result = false; // Release system resources App::trigger(AppEvent::RESOURCE_RELEASE); App::trigger(TaskEvent::AFTER_TASK); } return $result; } }
此處是swoole.onTask的事件回調(diào),其職責(zé)僅僅是將將Worker進(jìn)程投遞來的打包后的數(shù)據(jù)轉(zhuǎn)發(fā)給TaskExecutor。
Swoole的Task機(jī)制的本質(zhì)是Worker進(jìn)程將耗時(shí)任務(wù)投遞給同步的Task進(jìn)程(又名TaskWorker)處理,所以swoole.onTask的事件回調(diào)是在Task進(jìn)程中執(zhí)行的。上文說過,Worker進(jìn)程是你大部分HTTP服務(wù)代碼執(zhí)行的環(huán)境,但是從TaskEventListener.onTask()方法開始,代碼的執(zhí)行環(huán)境都是Task進(jìn)程,也就是說,TaskExecutor和具體的TaskBean都是執(zhí)行在Task進(jìn)程中的。
//SwoftTaskTaskExecutor /** * The task executor * * @Bean() */ class TaskExecutor { /** * @param string $data * @return mixed */ public function run(string $data) { $data = TaskHelper::unpack($data); $name = $data["name"]; $type = $data["type"]; $method = $data["method"]; $params = $data["params"]; $logid = $data["logid"] ?? uniqid("", true); $spanid = $data["spanid"] ?? 0; $collector = TaskCollector::getCollector(); if (!isset($collector["task"][$name])) { return false; } list(, $coroutine) = $collector["task"][$name]; $task = App::getBean($name); if ($coroutine) { $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type); } else { $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type); } return $result; } }
任務(wù)執(zhí)行思路很簡單,將Worker進(jìn)程發(fā)過來的數(shù)據(jù)解包還原成原來的調(diào)用參數(shù),根據(jù)$name參數(shù)找到對(duì)應(yīng)的TaskBean并調(diào)用其對(duì)應(yīng)的task()方法。其中TaskBean使用類級(jí)別注解@Task(name="TaskName")或者@Task("TaskName")聲明。
值得一提的一點(diǎn)是,@Task注解除了name屬性,還有一個(gè)coroutine屬性,上述代碼會(huì)根據(jù)該參數(shù)選擇使用協(xié)程的runCoTask()或者同步的runSyncTask()執(zhí)行Task。但是由于而且由于Swoole的Task進(jìn)程的執(zhí)行是完全同步的,不支持協(xié)程,所以目前版本請(qǐng)?jiān)搮?shù)不要配置為true。同樣的在TaskBean中編寫的任務(wù)代碼必須的同步阻塞的或者是要能根據(jù)環(huán)境自動(dòng)將異步非阻塞和協(xié)程降級(jí)為同步阻塞的
從Process中投遞任務(wù)前面我們提到:
Swoole的Task機(jī)制的本質(zhì)是Worker進(jìn)程將耗時(shí)任務(wù)投遞給同步的Task進(jìn)程(又名 TaskWorker)處理。
換句話說,Swoole的$server->taskCo()或$server->task()都只能在Worker進(jìn)程中使用。
這個(gè)限制大大的限制了使用場(chǎng)景。 如何能夠?yàn)榱四軌蛟?b>Process中投遞任務(wù)呢?Swoft為了繞過這個(gè)限制提供了Task::deliverByProcess()方法。其實(shí)現(xiàn)原理也很簡單,通過Swoole的$server->sendMessage()方法將調(diào)用信息從Process中投遞到Worker進(jìn)程中,然后由Worker進(jìn)程替其投遞到Task進(jìn)程當(dāng)中,相關(guān)代碼如下:
//SwoftTaskTask.php /** * Deliver task by process * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * @param int $workId * * @return bool */ public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool { /* @var PipeMessageInterface $pipeMessage */ $server = App::$server->getServer(); $pipeMessage = App::getBean(PipeMessage::class); $data = [ "name" => $taskName, "method" => $methodName, "params" => $params, "timeout" => $timeout, "type" => $type, ]; $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data); return $server->sendMessage($message, $workId); }
數(shù)據(jù)打包后使用$server->sendMessage()投遞給Worker:
//SwoftBootstrapServerServerTrait.php /** * onPipeMessage event callback * * @param SwooleServer $server * @param int $srcWorkerId * @param string $message * @return void * @throws InvalidArgumentException */ public function onPipeMessage(Server $server, int $srcWorkerId, string $message) { /* @var PipeMessageInterface $pipeMessage */ $pipeMessage = App::getBean(PipeMessage::class); list($type, $data) = $pipeMessage->unpack($message); App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId); }
$server->sendMessage后,Worker進(jìn)程收到數(shù)據(jù)時(shí)會(huì)觸發(fā)一個(gè)swoole.pipeMessage事件的回調(diào),Swoft會(huì)將其轉(zhuǎn)換成自己的swoft.pipeMessage事件并觸發(fā).
//SwoftTaskEventListenersPipeMessageListener.php /** * The pipe message listener * * @Listener(event=AppEvent::PIPE_MESSAGE) */ class PipeMessageListener implements EventHandlerInterface { /** * @param SwoftEventEventInterface $event */ public function handle(EventInterface $event) { $params = $event->getParams(); if (count($params) < 3) { return; } list($type, $data, $srcWorkerId) = $params; if ($type != PipeMessage::MESSAGE_TYPE_TASK) { return; } $type = $data["type"]; $taskName = $data["name"]; $params = $data["params"]; $timeout = $data["timeout"]; $methodName = $data["method"]; // delever task Task::deliver($taskName, $methodName, $params, $type, $timeout); } }
swoft.pipeMessage事件最終由PipeMessageListener處理。在相關(guān)的監(jiān)聽其中,如果發(fā)現(xiàn)swoft.pipeMessage事件由Task::deliverByProcess()產(chǎn)生的,Worker進(jìn)程會(huì)替其執(zhí)行一次Task::deliver(),最終將任務(wù)數(shù)據(jù)投遞到TaskWorker進(jìn)程中。
一道簡單的回顧練習(xí):從Task::deliverByProcess()到某TaskBean 最終執(zhí)行任務(wù),經(jīng)歷了哪些進(jìn)程,而調(diào)用鏈的哪些部分又分別是在哪些進(jìn)程中執(zhí)行?
從Command進(jìn)程或其子進(jìn)程中投遞任務(wù)//SwoftTaskQueueTask.php /** * @param string $data * @param int $taskWorkerId * @param int $srcWorkerId * * @return bool */ public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null) { if ($taskWorkerId === null) { $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum); } if ($srcWorkerId === null) { $srcWorkerId = mt_rand(0, $this->workerNum - 1); } $this->check(); $data = $this->pack($data, $srcWorkerId); $result = msg_send($this->queueId, $taskWorkerId, $data, false); if (!$result) { return false; } return true; }
對(duì)于Command進(jìn)程的任務(wù)投遞,情況會(huì)更復(fù)雜一點(diǎn)。
上文提到的Process,其往往衍生于Http/Rpc服務(wù),作為同一個(gè)Manager的子孫進(jìn)程,他們能夠拿到SwooleServer的句柄變量,從而通過$server->sendMessage(),$server->task()等方法進(jìn)行任務(wù)投遞。
但在Swoft的體系中,還有一個(gè)十分路人的角色: Command。
Command的進(jìn)程從shell或cronb獨(dú)立啟動(dòng),和Http/Rpc服務(wù)相關(guān)的進(jìn)程沒有親緣關(guān)系。因此Command進(jìn)程以及從Command中啟動(dòng)的Process進(jìn)程是沒有辦法拿到SwooleServer的調(diào)用句柄直接通過UnixSocket進(jìn)行任務(wù)投遞的。
為了為這種進(jìn)程提供任務(wù)投遞支持,Swoft利用了Swoole的Task進(jìn)程的一個(gè)特殊功能----消息隊(duì)列。
同一個(gè)項(xiàng)目中Command和HttpRpcServer 通過約定一個(gè)message_queue_key獲取到系統(tǒng)內(nèi)核中的同一條消息隊(duì)列,然后Comand進(jìn)程就可以通過該消息隊(duì)列向Task進(jìn)程投遞任務(wù)了。
該機(jī)制沒有提供對(duì)外的公開方法,僅僅被包含在Task::deliver()方法中,Swoft會(huì)根據(jù)當(dāng)前環(huán)境隱式切換投遞方式。但該消息隊(duì)列的實(shí)現(xiàn)依賴Semaphore拓展,如果你想使用,需要在編譯PHP時(shí)加上--enable-sysvmsg參數(shù)。
除了手動(dòng)執(zhí)行的普通任務(wù),Swoft還提供了精度為秒的定時(shí)任務(wù)功能用來在項(xiàng)目中替代Linux的Crontab功能.
Swoft用兩個(gè)前置Process---任務(wù)計(jì)劃進(jìn)程:CronTimerProcess和任務(wù)執(zhí)行進(jìn)程CronExecProcess
,和兩張內(nèi)存數(shù)據(jù)表-----RunTimeTable(任務(wù)(配置)表)OriginTable((任務(wù))執(zhí)行表)用于定時(shí)任務(wù)的管理調(diào)度。
兩張表的每行記錄的結(jié)構(gòu)如下:
SwoftTaskCrontabTableCrontab.php /** * 任務(wù)表,記錄用戶配置的任務(wù)信息 * 表每行記錄包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一確定一條記錄 * @var array $originStruct */ private $originStruct = [ "rule" => [SwooleTable::TYPE_STRING, 100],//定時(shí)任務(wù)執(zhí)行規(guī)則,對(duì)應(yīng)@Scheduled注解的cron屬性 "taskClass" => [SwooleTable::TYPE_STRING, 255],//任務(wù)名 對(duì)應(yīng)@Task的name屬性(默認(rèn)為類名) "taskMethod" => [SwooleTable::TYPE_STRING, 255],//Task方法,對(duì)應(yīng)@Scheduled注解所在方法 "add_time" => [SwooleTable::TYPE_STRING, 11],//初始化該表內(nèi)容時(shí)的10位時(shí)間戳 ]; /** * 執(zhí)行表,記錄短時(shí)間內(nèi)要執(zhí)行的任務(wù)列表及其執(zhí)行狀態(tài) * 表每行記錄包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一確定一條記錄 * @var array $runTimeStruct */ private $runTimeStruct = [ "taskClass" => [SwooleTable::TYPE_STRING, 255],//同上 "taskMethod" => [SwooleTable::TYPE_STRING, 255],//同上 "minute" => [SwooleTable::TYPE_STRING, 20],//需要執(zhí)行任務(wù)的時(shí)間,精確到分鐘 格式date("YmdHi") "sec" => [SwooleTable::TYPE_STRING, 20],//需要執(zhí)行任務(wù)的時(shí)間,精確到分鐘 10位時(shí)間戳 "runStatus" => [SwooleTABLE::TYPE_INT, 4],//任務(wù)狀態(tài),有 0(未執(zhí)行) 1(已執(zhí)行) 2(執(zhí)行中) 三種。 //注意:這里的執(zhí)行是一個(gè)容易誤解的地方,此處的執(zhí)行并不是指任務(wù)本身的執(zhí)行,而是值`任務(wù)投遞`這一操作的執(zhí)行,從宏觀上看換成 _未投遞_,_已投遞_,_投遞中_描述會(huì)更準(zhǔn)確。 ];此處為何要使用Swoole的內(nèi)存Table?
Swoft的的定時(shí)任務(wù)管理是分別由 任務(wù)計(jì)劃進(jìn)程 和 任務(wù)執(zhí)行進(jìn)程 進(jìn)程負(fù)責(zé)的。兩個(gè)進(jìn)程的運(yùn)行共同管理定時(shí)任務(wù),如果使用進(jìn)程間獨(dú)立的array()等結(jié)構(gòu),兩個(gè)進(jìn)程必然需要頻繁的進(jìn)程間通信。而使用跨進(jìn)程的Table(本文的Table,除非特別說明,都指Swoole的SwooleTable結(jié)構(gòu))直接進(jìn)行進(jìn)程間數(shù)據(jù)共享,不僅性能高,操作簡單 還解耦了兩個(gè)進(jìn)程。
為了Table能夠在兩個(gè)進(jìn)程間共同使用,Table必須在Swoole Server啟動(dòng)前創(chuàng)建并分配內(nèi)存。具體代碼在SwoftTaskBootstrapListeners->onBeforeStart()中,比較簡單,有興趣的可以自行閱讀。
背景介紹完了,我們來看看這兩個(gè)定時(shí)任務(wù)進(jìn)程的行為
//SwoftTaskBootstrapProcessCronTimerProcess.php /** * Crontab timer process * * @Process(name="cronTimer", boot=true) */ class CronTimerProcess implements ProcessInterface { /** * @param SwoftProcessProcess $process */ public function run(SwoftProcess $process) { //code.... /* @var SwoftTaskCrontabCrontab $cron*/ $cron = App::getBean("crontab"); // Swoole/HttpServer $server = App::$server->getServer(); $time = (60 - date("s")) * 1000; $server->after($time, function () use ($server, $cron) { // Every minute check all tasks, and prepare the tasks that next execution point needs $cron->checkTask(); $server->tick(60 * 1000, function () use ($cron) { $cron->checkTask(); }); }); } }
//SwoftTaskCrontabCrontab.php /** * 初始化runTimeTable數(shù)據(jù) * * @param array $task 任務(wù) * @param array $parseResult 解析crontab命令規(guī)則結(jié)果,即Task需要在當(dāng)前分鐘內(nèi)的哪些秒執(zhí)行 * @return bool */ private function initRunTimeTableData(array $task, array $parseResult): bool { $runTimeTableTasks = $this->getRunTimeTable()->table; $min = date("YmdHi"); $sec = strtotime(date("Y-m-d H:i")); foreach ($parseResult as $time) { $this->checkTaskQueue(false); $key = $this->getKey($task["rule"], $task["taskClass"], $task["taskMethod"], $min, $time + $sec); $runTimeTableTasks->set($key, [ "taskClass" => $task["taskClass"], "taskMethod" => $task["taskMethod"], "minute" => $min, "sec" => $time + $sec, "runStatus" => self::NORMAL ]); } return true; }
CronTimerProcess是Swoft的定時(shí)任務(wù)調(diào)度進(jìn)程,其核心方法是Crontab->initRunTimeTableData()。
該進(jìn)程使用了Swoole的定時(shí)器功能,通過SwooleTimer在每分鐘首秒時(shí)執(zhí)行的回調(diào),CronTimerProcess每次被喚醒后都會(huì)遍歷任務(wù)表計(jì)算出當(dāng)前這一分鐘內(nèi)的60秒分別需要執(zhí)行的任務(wù)清單,寫入執(zhí)行表并標(biāo)記為 未執(zhí)行。
//SwoftTaskBootstrapProcess /** * Crontab process * * @Process(name="cronExec", boot=true) */ class CronExecProcess implements ProcessInterface { /** * @param SwoftProcessProcess $process */ public function run(SwoftProcess $process) { $pname = App::$server->getPname(); $process->name(sprintf("%s cronexec process", $pname)); /** @var SwoftTaskCrontabCrontab $cron */ $cron = App::getBean("crontab"); // Swoole/HttpServer $server = App::$server->getServer(); $server->tick(0.5 * 1000, function () use ($cron) { $tasks = $cron->getExecTasks(); if (!empty($tasks)) { foreach ($tasks as $task) { // Diliver task Task::deliverByProcess($task["taskClass"], $task["taskMethod"]); $cron->finishTask($task["key"]); } } }); } }
CronExecProcess作為定時(shí)任務(wù)的執(zhí)行者,通過SwooleTimer每0.5s喚醒自身一次,然后把 執(zhí)行表 遍歷一次,挑選當(dāng)下需要執(zhí)行的任務(wù),通過sendMessage()投遞出去并更新該 任務(wù)執(zhí)行表中的狀態(tài)。
該執(zhí)行進(jìn)程只負(fù)責(zé)任務(wù)的投遞,任務(wù)的實(shí)際實(shí)際執(zhí)行仍然在Task進(jìn)程中由TaskExecutor處理。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/29080.html
摘要:作者鏈接來源簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。同時(shí)順手整理個(gè)人對(duì)源碼的相關(guān)理解,希望能夠稍微填補(bǔ)學(xué)習(xí)領(lǐng)域的空白。系列文章只會(huì)節(jié)選關(guān)鍵代碼輔以思路講解,請(qǐng)自行配合源碼閱讀。 作者:bromine鏈接:https://www.jianshu.com/p/2f6...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。Swoft...
摘要:和服務(wù)關(guān)系最密切的進(jìn)程是中的進(jìn)程組,絕大部分業(yè)務(wù)處理都在該進(jìn)程中進(jìn)行。隨后觸發(fā)一個(gè)事件各組件通過該事件進(jìn)行配置文件加載路由注冊(cè)。事件每個(gè)請(qǐng)求到來時(shí)僅僅會(huì)觸發(fā)事件。服務(wù)器生命周期和服務(wù)基本一致,詳情參考源碼剖析功能實(shí)現(xiàn) 作者:bromine鏈接:https://www.jianshu.com/p/4c0...來源:簡書著作權(quán)歸作者所有,本文已獲得作者授權(quán)轉(zhuǎn)載,并對(duì)原文進(jìn)行了重新的排版。S...
摘要:在中的應(yīng)用官網(wǎng)源碼解讀號(hào)外號(hào)外歡迎大家我們開發(fā)組定了一個(gè)就線下聚一次的小目標(biāo)上一篇源碼解讀反響還不錯(cuò)不少同學(xué)推薦再加一篇講解一下中使用到的功能幫助大家開啟的實(shí)戰(zhàn)之旅服務(wù)器開發(fā)涉及到的相關(guān)技術(shù)領(lǐng)域的知識(shí)非常多不日積月累打好基礎(chǔ)是很難真正 date: 2017-12-14 21:34:51title: swoole 在 swoft 中的應(yīng)用 swoft 官網(wǎng): https://www.sw...
摘要:官網(wǎng)源碼解讀號(hào)外號(hào)外歡迎大家我們開發(fā)組定了一個(gè)就線下聚一次的小目標(biāo)里面的框架算是非常重的了這里的重先不具體到性能層面主要是框架的設(shè)計(jì)思想和框架集成的服務(wù)讓框架可以既可以快速解決很多問題又可以輕松擴(kuò)展中的框架有在應(yīng)該無出其右了這次解讀的源碼 官網(wǎng): https://www.swoft.org/源碼解讀: http://naotu.baidu.com/file/8... 號(hào)外號(hào)外, 歡迎大...
摘要:我們項(xiàng)目使用的是框架,所以我就想到用框架的定時(shí)器。,以及的結(jié)構(gòu)注在定時(shí)器這塊使用到兩個(gè)一個(gè)是用于存儲(chǔ)任務(wù)的實(shí)例。 這兩天老大給了個(gè)需求想把商城熱點(diǎn)數(shù)據(jù)同步到redis緩存。我們項(xiàng)目使用的是swoft框架,所以我就想到用框架的Crontab定時(shí)器。但是在測(cè)試的時(shí)候發(fā)現(xiàn)把Table的size設(shè)置為1024時(shí)(實(shí)際上設(shè)置為任何大小都一樣,貼上swoole的解釋)發(fā)現(xiàn)內(nèi)存溢出了 showImg...
閱讀 976·2023-04-25 18:51
閱讀 1961·2021-09-09 11:39
閱讀 3341·2019-08-30 15:53
閱讀 2144·2019-08-30 13:03
閱讀 1357·2019-08-29 16:17
閱讀 637·2019-08-29 11:33
閱讀 1946·2019-08-26 14:00
閱讀 2174·2019-08-26 13:41