摘要:回調(diào)函數(shù)中檢測(cè)該次寫(xiě)入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫(xiě)流表示該寫(xiě)入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ?。監(jiān)聽(tīng)源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫(xiě)流。
在Node.js中,流(Stream)是其眾多原生對(duì)象的基類(lèi),它對(duì)處理潛在的大文件提供了支持,也抽象了一些場(chǎng)景下的數(shù)據(jù)處理和傳遞。在它對(duì)外暴露的接口中,最為神奇的,莫過(guò)于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node.js中的部分源碼,也來(lái)從源碼層面分享下導(dǎo)流的具體實(shí)現(xiàn)。
正題以下是一個(gè)關(guān)于導(dǎo)流的簡(jiǎn)單例子:
"use strict" import {createReadStream, createWriteStream} from "fs" createReadStream("/path/to/a/big/file").pipe(createWriteStream("/path/to/the/dest"))
再結(jié)合官方文檔,我們可以把pipe方法的主要功能分解為:
不斷從來(lái)源可讀流中獲得一個(gè)指定長(zhǎng)度的數(shù)據(jù)。
將獲取到的數(shù)據(jù)寫(xiě)入目標(biāo)可寫(xiě)流。
平衡讀取和寫(xiě)入速度,防止讀取速度大大超過(guò)寫(xiě)入速度時(shí),出現(xiàn)大量滯留數(shù)據(jù)。
好,讓我們跟隨Node.js項(xiàng)目里lib/_stream_readable.js和lib/_stream_writable.js中的代碼,逐個(gè)解析這三個(gè)主要功能的實(shí)現(xiàn)。
讀取數(shù)據(jù)剛創(chuàng)建出的可讀流只是一個(gè)記錄了一些初始狀態(tài)的空殼,里面沒(méi)有任何數(shù)據(jù),并且其狀態(tài)不屬于官方文檔中的流動(dòng)模式(flowing mode)和暫停模式(paused mode)中的任何一種,算是一種偽暫停模式,因?yàn)榇藭r(shí)實(shí)例的狀態(tài)中記錄它是否為暫停模式的變量還不是標(biāo)準(zhǔn)的布爾值,而是null,但又可通過(guò)將暫停模式轉(zhuǎn)化為流動(dòng)模式的行為(調(diào)用實(shí)例的resume()方法),將可讀流切換至流動(dòng)模式。在外部代碼中,我們可以手動(dòng)監(jiān)聽(tīng)可讀流的data事件,讓其進(jìn)入流動(dòng)模式:
// lib/_stream_readable.js // ... Readable.prototype.on = function(ev, fn) { var res = Stream.prototype.on.call(this, ev, fn); if (ev === "data" && false !== this._readableState.flowing) { this.resume(); } // ... return res; };
可見(jiàn),可讀流類(lèi)通過(guò)二次封裝父類(lèi)(EventEmitter)的on()方法,替我們?cè)诒O(jiān)聽(tīng)data事件時(shí),將流切換至了流動(dòng)模式。而開(kāi)始讀取數(shù)據(jù)的動(dòng)作,則存在于resume()方法調(diào)用的內(nèi)部方法resume_()中,讓我們一窺究竟:
// lib/_stream_readable.js // ... function resume_(stream, state) { if (!state.reading) { debug("resume read 0"); stream.read(0); } // ... }
通過(guò)向可讀流讀取一次空數(shù)據(jù)(大小為0),將會(huì)觸發(fā)實(shí)例層面實(shí)現(xiàn)的_read()方法,開(kāi)始讀取數(shù)據(jù),然后利用讀到的數(shù)據(jù)觸發(fā)data事件:
// lib/_stream_readable.js // ... Readable.prototype.read = function(n) { // ... // 此次判斷的意圖為,如果可讀流的緩沖中已滿,則只空觸發(fā)readable事件。 if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this); return null; } // 若可讀流已經(jīng)被傳入了終止符(null),且緩沖中沒(méi)有遺留數(shù)據(jù),則結(jié)束這個(gè)可讀流 if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; } // 若目前緩沖中的數(shù)據(jù)大小為空,或未超過(guò)設(shè)置的警戒線,則進(jìn)行一次數(shù)據(jù)讀取。 if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; } if (doRead) { // ... this._read(state.highWaterMark); } // ... if (ret !== null) this.emit("data", ret); return ret; };
可見(jiàn),在可讀流的read()方法內(nèi)部,通過(guò)調(diào)用在實(shí)例層面實(shí)現(xiàn)的_read(size)方法,取得了一段(設(shè)置的警戒線)大小的數(shù)據(jù),但是,你可能會(huì)疑惑,這只是讀取了一次數(shù)據(jù)啊,理想情況下,應(yīng)該是循環(huán)調(diào)用_read(size)直至取完所有數(shù)據(jù)才對(duì)啊?。科鋵?shí),這部分的邏輯存在于我們實(shí)現(xiàn)_read(size)方法時(shí),在其內(nèi)部調(diào)用的this.push(data)方法中,在最后其會(huì)調(diào)用私有方法maybeReadMore_(),再次觸發(fā)read(0),接著在read(0)函數(shù)的代碼中再次判斷可讀流是否能夠結(jié)束,否則再進(jìn)行一次_read(size)讀?。?/p>
// lib/_stream_readable.js // ... Readable.prototype.push = function(chunk, encoding) { var state = this._readableState; // ... return readableAddChunk(this, state, chunk, encoding, false); }; function readableAddChunk(stream, state, chunk, encoding, addToFront) { // ... if (er) { stream.emit("error", er); } else if (chunk === null) { state.reading = false; onEofChunk(stream, state); // 當(dāng)傳入終止符時(shí),將可讀流的結(jié)束標(biāo)識(shí)(state.ended)設(shè)為true } // ... maybeReadMore(stream, state); } } // ... } function maybeReadMore(stream, state) { if (!state.readingMore) { // ... process.nextTick(maybeReadMore_, stream, state); } } function maybeReadMore_(stream, state) { // ... stream.read(0); } function onEofChunk(stream, state) { if (state.ended) return; // ... state.ended = true; // ... }
好的,此時(shí)從可讀流中讀取數(shù)據(jù)的整個(gè)核心流程已經(jīng)實(shí)現(xiàn)了,讓我們歸納一下:
剛創(chuàng)建出的可讀流只是一個(gè)空殼,保存著一些初始狀態(tài)。
監(jiān)聽(tīng)它的data事件,將會(huì)自動(dòng)調(diào)用該可讀流的resume()方法,使流切換至流動(dòng)模式。
在resume()方法的內(nèi)部函數(shù)_resume()中,對(duì)可讀流進(jìn)行了一次read(0)調(diào)用。
read(0)調(diào)用的內(nèi)部,首先檢查流是否符合了結(jié)束條件,若符合,則結(jié)束之。否則調(diào)用實(shí)例實(shí)現(xiàn)的_read(size)方法讀取一段預(yù)設(shè)的警戒線(highWaterMark)大小的數(shù)據(jù)。
在實(shí)例實(shí)現(xiàn)_read(size)函數(shù)時(shí)內(nèi)部調(diào)用的this.push(data)方法里,會(huì)先判斷的讀到的數(shù)據(jù)是否為結(jié)束符,若是,則將流的狀態(tài)設(shè)為結(jié)束,然后再一次對(duì)可讀流調(diào)用read(0)。
寫(xiě)入數(shù)據(jù)和可讀流一樣,剛創(chuàng)建出的可寫(xiě)流也只是一個(gè)記錄了相關(guān)狀態(tài)(包括預(yù)設(shè)的寫(xiě)入緩沖大?。┑目諝?。直接調(diào)用它的write方法,該方法會(huì)在其內(nèi)部調(diào)用writeOrBuffer函數(shù)來(lái)對(duì)數(shù)據(jù)是否可以直接一次性全部寫(xiě)入進(jìn)行判斷:
// lib/_stream_writable.js // ... function writeOrBuffer(stream, state, chunk, encoding, cb) { // ... var ret = state.length < state.highWaterMark; // 記錄可寫(xiě)流是否需要出發(fā)drain事件 if (!ret) state.needDrain = true; if (state.writing || state.corked) { // 若可寫(xiě)流正在被寫(xiě)入或被人工阻塞,則先將寫(xiě)入操作排隊(duì) // ... } else { doWrite(stream, state, false, len, chunk, encoding, cb); } return ret; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { // ... if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); // ... }
從代碼中可知,在writeOrBuffer函數(shù)記錄下了數(shù)據(jù)是否可以被一次性寫(xiě)入后,調(diào)用了實(shí)例層實(shí)現(xiàn)的_write()或_writev()方法進(jìn)行了實(shí)際的寫(xiě)入操作。那么,如果不能一次性寫(xiě)入完畢,那么在真正寫(xiě)入完畢時(shí),又是如何進(jìn)行通知的呢?嗯,答案就在設(shè)置的state.onwrite回調(diào)中:
// lib/_stream_writable.js // ... function onwrite(stream, er) { // ... if (er) onwriteError(stream, state, sync, er, cb); else { // ... if (sync) { process.nextTick(afterWrite, stream, state, finished, cb); } else { afterWrite(stream, state, finished, cb); } } } function afterWrite(stream, state, finished, cb) { if (!finished) onwriteDrain(stream, state); // ... } function onwriteDrain(stream, state) { if (state.length === 0 && state.needDrain) { state.needDrain = false; stream.emit("drain"); } }
可見(jiàn),在回調(diào)函數(shù)的執(zhí)行中,會(huì)對(duì)該可寫(xiě)流該次被寫(xiě)入的數(shù)據(jù)是否超過(guò)了警戒線的狀態(tài)進(jìn)行判斷,如果是,則觸發(fā)drain事件,進(jìn)行通知。
我們也可以調(diào)用end()方法來(lái)表明要結(jié)束這個(gè)寫(xiě)入流,并進(jìn)行最后一次寫(xiě)入,end()方法的內(nèi)部最終會(huì)調(diào)用endWritable()函數(shù)來(lái)講可寫(xiě)流的狀態(tài)切換為已結(jié)束:
// lib/_stream_writable.js // ... function endWritable(stream, state, cb) { // ... state.ended = true; stream.writable = false; }
此時(shí),向可寫(xiě)流中寫(xiě)入數(shù)據(jù)的整個(gè)核心流程已經(jīng)實(shí)現(xiàn)了,這個(gè)流程和可寫(xiě)流的循環(huán)讀取流程不同,它是直線的,歸納一下:
剛創(chuàng)建出的可寫(xiě)流只是一個(gè)空殼,保存著一些初始狀態(tài)。
調(diào)用write()方法,其內(nèi)部的writeOrBuffer()檢測(cè)該次寫(xiě)入的數(shù)據(jù)是否需要被暫存在緩沖區(qū)中。
writeOrBuffer()函數(shù)調(diào)用實(shí)例實(shí)現(xiàn)的_write()或_writev()方法,進(jìn)行實(shí)際的寫(xiě)入,完成后調(diào)用回調(diào)函數(shù)state.onwrite。
回調(diào)函數(shù)中檢測(cè)該次寫(xiě)入是否被緩沖,若是,觸發(fā)drain事件。
重復(fù)以上過(guò)程,直至調(diào)用end()方法結(jié)束該可寫(xiě)流。
導(dǎo)流在摸清了從可讀流中讀數(shù)據(jù),和向可寫(xiě)流中寫(xiě)數(shù)據(jù)實(shí)現(xiàn)的核心流程后,Node.js中實(shí)現(xiàn)導(dǎo)流的核心流程其實(shí)已經(jīng)呼之欲出了。首先,為了開(kāi)始從源可讀流讀取數(shù)據(jù),在pipe()方法的內(nèi)部,它主動(dòng)為源可讀流添加了data事件的監(jiān)聽(tīng)函數(shù):
// lib/_stream_readable.js // ... Readable.prototype.pipe = function(dest, pipeOpts) { // ... src.on("data", ondata); function ondata(chunk) { // ... src.pause(); } } // ... return dest; };
從代碼中可見(jiàn),若向目標(biāo)可寫(xiě)流寫(xiě)入一次數(shù)據(jù)時(shí),目標(biāo)可寫(xiě)流表示該次寫(xiě)入它需要進(jìn)行緩沖,則主動(dòng)將源可讀流切換至?xí)和DJ?。那么,源可讀流通過(guò)什么手段得知可以再次讀取數(shù)據(jù)并寫(xiě)入呢?嗯,通過(guò)監(jiān)聽(tīng)目標(biāo)可寫(xiě)流的drain事件:
// lib/_stream_readable.js // ... Readable.prototype.pipe = function(dest, pipeOpts) { // ... var ondrain = pipeOnDrain(src); dest.on("drain", ondrain); // ... return dest; }; function pipeOnDrain(src) { return function() { var state = src._readableState; // 目標(biāo)可寫(xiě)流可能會(huì)存在多次寫(xiě)入需要進(jìn)行緩沖的情況,需確保所有需要緩沖的寫(xiě)入都 // 完成后,再次將可讀流切換至流動(dòng)模式。 if (state.awaitDrain) state.awaitDrain--; if (state.awaitDrain === 0 && EE.listenerCount(src, "data")) { state.flowing = true; flow(src); } }; }
最后,監(jiān)聽(tīng)源可讀流的結(jié)束事件,對(duì)應(yīng)著結(jié)束目標(biāo)可寫(xiě)流:
// lib/_stream_readable.js // ... Readable.prototype.pipe = function(dest, pipeOpts) { // ... var endFn = doEnd ? onend : cleanup; if (state.endEmitted) process.nextTick(endFn); else src.once("end", endFn); function onend() { debug("onend"); dest.end(); } // ... return dest; };
由于前面的鋪墊,實(shí)際導(dǎo)流操作的核心流程其實(shí)實(shí)現(xiàn)得非常輕松,歸納一下:
主動(dòng)監(jiān)聽(tīng)源可讀流的data事件,在該事件的監(jiān)聽(tīng)函數(shù)中,向目標(biāo)可寫(xiě)流寫(xiě)入數(shù)據(jù)。
若目標(biāo)可寫(xiě)流表示該寫(xiě)入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ健?/p>
監(jiān)聽(tīng)目標(biāo)可寫(xiě)流的drain事件,當(dāng)目標(biāo)可寫(xiě)流里所有需要緩沖的寫(xiě)入操作都完畢后,將流重新切換回流動(dòng)模式。
監(jiān)聽(tīng)源可讀流的end事件,相應(yīng)地結(jié)束目標(biāo)可寫(xiě)流。
最后Node.js中流的實(shí)際實(shí)現(xiàn)其實(shí)非常龐大,復(fù)雜,精妙。每一個(gè)流的內(nèi)部,都管理著大量狀態(tài)。本文僅僅只是在龐大的流的實(shí)現(xiàn)中,選擇了一條主線,進(jìn)行了闡述。大家如果有閑,非常推薦完整地閱讀一遍其實(shí)現(xiàn)。
參考:
https://github.com/nodejs/node/blob/master/lib/_stream_readable.js
https://github.com/nodejs/node/blob/master/lib/_stream_writable.js
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/78652.html
摘要:是在完成處理數(shù)據(jù)塊后需要調(diào)用的函數(shù)。這是寫(xiě)數(shù)據(jù)成功與否的標(biāo)志。若要發(fā)出故障信號(hào),請(qǐng)用錯(cuò)誤對(duì)象調(diào)用回調(diào)函數(shù)。雙工流的可讀性和可寫(xiě)性操作完全獨(dú)立于彼此。這僅僅是將兩個(gè)特性組合成一個(gè)對(duì)象。 showImg(https://segmentfault.com/img/remote/1460000013228112?w=533&h=300); Streams 是一個(gè)數(shù)據(jù)集——和數(shù)組、字符串一樣。不...
摘要:當(dāng)一個(gè)客戶端的響應(yīng)對(duì)象是一個(gè)可讀流,那么在服務(wù)器端這就是一個(gè)可寫(xiě)流。的模塊給我們提供了一個(gè)可以操作任何文件的可讀流通過(guò)方法創(chuàng)建。創(chuàng)建一個(gè)可讀流創(chuàng)建可讀流,我們需要類(lèi)創(chuàng)建一個(gè)可讀流非常簡(jiǎn)單。可以通過(guò)修改可讀流配置里面的方法實(shí)現(xiàn)。 Node.js的stream模塊是有名的應(yīng)用困難,更別說(shuō)理解了。那現(xiàn)在可以告訴你,這些都不是問(wèn)題了。 多年來(lái),開(kāi)發(fā)人員在那里創(chuàng)建了大量的軟件包,其唯一目的就是使...
摘要:在這樣的程序中,異步編程通常是有幫助的。最初是為了使異步編程簡(jiǎn)單方便而設(shè)計(jì)的。在年設(shè)計(jì)時(shí),人們已經(jīng)在瀏覽器中進(jìn)行基于回調(diào)的編程,所以該語(yǔ)言的社區(qū)用于異步編程風(fēng)格。 來(lái)源:ApacheCN『JavaScript 編程精解 中文第三版』翻譯項(xiàng)目原文:Node.js 譯者:飛龍 協(xié)議:CC BY-NC-SA 4.0 自豪地采用谷歌翻譯 部分參考了《JavaScript 編程精解(第 2 版)...
摘要:方法也可以接收一個(gè)參數(shù)表示數(shù)據(jù)請(qǐng)求著請(qǐng)求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個(gè)參數(shù)。讀取數(shù)據(jù)大部分情況下我們只要簡(jiǎn)單的使用方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。 介紹本文介紹了使用 node.js streams 開(kāi)發(fā)程序的基本方法。 We should have some ways of connecting programs ...
摘要:在可讀流事件里我們就必須調(diào)用方法。當(dāng)一個(gè)對(duì)象就意味著我們想發(fā)出信號(hào)這個(gè)流沒(méi)有更多數(shù)據(jù)了自定義可寫(xiě)流為了實(shí)現(xiàn)可寫(xiě)流,我們需要使用流模塊中的構(gòu)造函數(shù)。我們只需給構(gòu)造函數(shù)傳遞一些選項(xiàng)并創(chuàng)建一個(gè)對(duì)象。 前言 什么是流呢?看字面意思,我們可能會(huì)想起生活中的水流,電流。但是流不是水也不是電,它只是描述水和電的流動(dòng);所以說(shuō)流是抽象的。在node.js中流是一個(gè)抽象接口,它不關(guān)心文件內(nèi)容,只關(guān)注是否從...
閱讀 1635·2021-11-19 11:38
閱讀 3634·2021-11-15 11:37
閱讀 870·2021-09-30 09:48
閱讀 1097·2021-09-29 09:46
閱讀 963·2021-09-23 11:22
閱讀 1946·2019-08-30 15:44
閱讀 3472·2019-08-26 13:58
閱讀 2437·2019-08-26 13:26