摘要:上游水源通過(guò)里中的方法流入水電站中。當(dāng)在接收數(shù)據(jù)中出現(xiàn)錯(cuò)誤時(shí)發(fā)出。暫??勺x流,不再發(fā)出事件恢復(fù)可讀流,繼續(xù)發(fā)出事件把這個(gè)可讀流的輸出傳遞給指定的流,兩個(gè)流組成一個(gè)管道。
題外話
該文章整合了多篇網(wǎng)絡(luò)文章(整合之處已設(shè)置超鏈接,可點(diǎn)擊直接了解原文),目的僅僅是為了和大伙分享,更加通俗易懂的了解流的各個(gè)流程的初始。本人也是node的初學(xué)菜鳥(niǎo),有描述錯(cuò)誤或誤人子弟的地方多請(qǐng)大神們多多指出。
readable 我們先來(lái)安利一些思路,方便理清楚邏輯:)。事件 查看原文讀緩沖區(qū)(readable buffer):這里的讀是個(gè)形容詞,是指可讀流臨時(shí)存放data(只能是字符串或者Buffer,不能是數(shù)字)的緩沖區(qū)。(讀緩沖區(qū)就像一個(gè)水電站一樣,感覺(jué)這樣描述比較好理解flowing、paused模式)
flowing模式:即流動(dòng)模式,就像打開(kāi)水電站的水閘一樣,上游的水和下游完完全全連通直到上游來(lái)源的數(shù)據(jù)耗盡。
paused模式:即暫停模式,就像水電站的水閘在你指定的時(shí)候(使用stream.read())才會(huì)打開(kāi)。不過(guò),當(dāng)你使用read()打開(kāi)水閘的時(shí)候是一個(gè)超自然現(xiàn)象---水電站里的水瞬間被抽干,上游的水還沒(méi)來(lái)得及填充水電站。然后自動(dòng)關(guān)閉水閘,等待你的下一次“惠顧“read()。
_read:上游水源通過(guò)_read里中的push、unshift方法流入水電站中。
函數(shù) 查看原文readable:在數(shù)據(jù)塊可以從流中讀取的時(shí)候發(fā)出。它對(duì)應(yīng)的處理器沒(méi)有參數(shù),可以在處理器里調(diào)用read([size])方法讀取數(shù)據(jù)。
data:有數(shù)據(jù)可讀時(shí)發(fā)出。它對(duì)應(yīng)的處理器有一個(gè)參數(shù),代表數(shù)據(jù)。如果你只想快快地讀取一個(gè)流的數(shù)據(jù),給data關(guān)聯(lián)一個(gè)處理器是最方便的辦法。處理器的參數(shù)是Buffer對(duì)象,如果你調(diào)用了Readable的setEncoding(encoding)方法,處理器的參數(shù)就是String對(duì)象。
end:當(dāng)數(shù)據(jù)被讀完時(shí)發(fā)出。對(duì)應(yīng)的處理器沒(méi)有參數(shù)。
close:當(dāng)?shù)讓拥馁Y源,如文件,已關(guān)閉時(shí)發(fā)出。不是所有的Readable流都會(huì)發(fā)出這個(gè)事件。對(duì)應(yīng)的處理器沒(méi)有參數(shù)。
error:當(dāng)在接收數(shù)據(jù)中出現(xiàn)錯(cuò)誤時(shí)發(fā)出。對(duì)應(yīng)的處理器參數(shù)是Error的實(shí)例,它的message屬性描述了錯(cuò)誤原因,stack屬性保存了發(fā)生錯(cuò)誤時(shí)的堆棧信息。
流動(dòng)模式和暫停模式切換 查看原文read([size]):如果你給read方法傳遞了一個(gè)大小作為參數(shù),那它會(huì)返回指定數(shù)量的數(shù)據(jù),如果數(shù)據(jù)不足,就會(huì)返回null。如果你不給read方法傳參,它會(huì)返回內(nèi)部緩沖區(qū)里的所有數(shù)據(jù),如果沒(méi)有數(shù)據(jù),會(huì)返回null,此時(shí)有可能說(shuō)明遇到了文件末尾。read返回的數(shù)據(jù)可能是Buffer對(duì)象,也可能是String對(duì)象。
setEncoding(encoding):給流設(shè)置一個(gè)編碼格式,用于解碼讀到的數(shù)據(jù)。調(diào)用此方法后,read([size])方法返回String對(duì)象。
pause():暫停可讀流,不再發(fā)出data事件
resume():恢復(fù)可讀流,繼續(xù)發(fā)出data事件
pipe(destination,[options]):把這個(gè)可讀流的輸出傳遞給destination指定的Writable流,兩個(gè)流組成一個(gè)管道。options是一個(gè)JS對(duì)象,這個(gè)對(duì)象有一個(gè)布爾類型的end屬性,默認(rèn)值為true,當(dāng)end為true時(shí),Readable結(jié)束時(shí)自動(dòng)結(jié)束Writable。注意,我們可以把一個(gè)Readable與若干Writable連在一起,組成多個(gè)管道,每一個(gè)Writable都能得到同樣的數(shù)據(jù)。這個(gè)方法返回destination,如果destination本身又是Readable流,就可以級(jí)聯(lián)調(diào)用pipe(比如我們?cè)谑褂胓zip壓縮、解壓縮時(shí)就會(huì)這樣,馬上會(huì)講到)。
unpipe([destination]):端口與指定destination的管道。不傳遞destination時(shí),斷開(kāi)與這個(gè)可讀流連在一起的所有管道。
通過(guò)添加 data 事件監(jiān)聽(tīng)器來(lái)啟動(dòng)數(shù)據(jù)監(jiān)聽(tīng)
調(diào)用 resume() 方法啟動(dòng)數(shù)據(jù)流
調(diào)用 pipe() 方法將數(shù)據(jù)轉(zhuǎn)接到另一個(gè) 可寫流
觸發(fā)準(zhǔn)備數(shù)據(jù)(_read)的方法在流沒(méi)有 pipe() 時(shí),調(diào)用 pause() 方法可以將流暫停
pipe() 時(shí),需要移除所有 data 事件的監(jiān)聽(tīng),再調(diào)用 unpipe() 方法
工作流程 查看原文data listener
readable listener
read()——如果當(dāng)前緩沖區(qū)為空,或者緩沖區(qū)并未超出我們?cè)O(shè)定的最大值,那么就可以繼續(xù)準(zhǔn)備數(shù)據(jù);如果此時(shí)正在準(zhǔn)備數(shù)據(jù)(_read())或者已經(jīng)結(jié)束讀?。╬ush(null)),那么就放棄準(zhǔn)備數(shù)據(jù)。
1.在paused模式下則讀取全部緩沖區(qū)的長(zhǎng)度;若讀取的字節(jié)數(shù)(n)大于設(shè)置的緩沖區(qū)最大值,則適當(dāng)擴(kuò)大緩沖區(qū)的大?。J(rèn)為16k,最大為8m);若讀取的長(zhǎng)度大于當(dāng)前緩沖區(qū)的大小,設(shè)置needReadable屬性并準(zhǔn)備數(shù)據(jù)等待下一次讀取。
2.如果當(dāng)前緩沖區(qū)為空,或者緩沖區(qū)并未超出我們?cè)O(shè)定的最大值,那么就可以繼續(xù)準(zhǔn)備數(shù)據(jù);如果此時(shí)正在準(zhǔn)備數(shù)據(jù)(_read())或者已經(jīng)結(jié)束讀?。╬ush(null)),那么就放棄準(zhǔn)備數(shù)據(jù)。
3.針對(duì)這個(gè)私有方法_read,文檔上有特殊說(shuō)明,自定義的Readable實(shí)現(xiàn)類需要實(shí)現(xiàn)這個(gè)方法,在該方法中手動(dòng)添加數(shù)據(jù)到Readable對(duì)象的讀緩沖區(qū),然后進(jìn)行Readable的讀取。可以理解為_(kāi)read函數(shù)為讀取數(shù)據(jù)前的準(zhǔn)備工作(準(zhǔn)備數(shù)據(jù)),針對(duì)的是流的實(shí)現(xiàn)者而言。
1.對(duì)于處在flowing模式下的讀取,每次只讀緩沖區(qū)中第一個(gè)buffer的長(zhǎng)度
2.針對(duì)這個(gè)私有方法_read,文檔上有特殊說(shuō)明,自定義的Readable實(shí)現(xiàn)類需要實(shí)現(xiàn)這個(gè)方法,在該方法中手動(dòng)添加數(shù)據(jù)到Readable對(duì)象的讀緩沖區(qū),然后進(jìn)行Readable的讀取??梢岳斫鉃開(kāi)read函數(shù)為讀取數(shù)據(jù)前的準(zhǔn)備工作(準(zhǔn)備數(shù)據(jù)),針對(duì)的是流的實(shí)現(xiàn)者而言。
實(shí)例//這是一個(gè)將存放多條json字符串的txt文件讀取成json的例子 const stream = require("stream"); const fs = require("fs"); const util = require("util"); function JSONLineReader(source) { stream.Readable.call(this); this._source = source; this._foundLineEnd = false; this._buffer = ""; source.on("readable", function() {//監(jiān)聽(tīng)source什么時(shí)候準(zhǔn)備好,那么我們就可以用read()或則readable listener去觸發(fā)JSONLineReader的_read方法 this.read(); // this.on("readable", function(data) { // console.log("readable"); // }); }.bind(this)) } util.inherits(JSONLineReader, stream.Readable); JSONLineReader.prototype._read = function(size) { var chunk; var line; var lineIndex; var result; if (this._buffer.length === 0) { chunk = this._source.read(); this._buffer += chunk; //一次就拿完 只是看什么時(shí)候push null } lineIndex = this._buffer.indexOf(" "); if (lineIndex !== -1) { line = this._buffer.slice(0, lineIndex); if (line) { result = JSON.parse(line); this._buffer = this._buffer.slice(lineIndex + 1); this.emit("object", result);util.inspect(result)) this.push(util.inspect(result)); } else { this._buffer = this._buffer.slice(1); } } } let input = fs.createReadStream(__dirname + "/json-lines.txt", { encoding: "utf8" }); var jsonLineReader = new JSONLineReader(input); jsonLineReader.on("object", function(obj) { console.log("pos:", obj); }) /*json-lines.txt {"success":false,"code":501} {"success":true,"code":202} {"success":false,"code":503} {"success":true,"code":204} {"success":false,"code":505} {"success":true,"code":206} {"success":false,"code":507} {"success":true,"code":208} {"success":false,"code":509} */
let stream = require("stream"); let util = require("util"); util.inherits(flowingReadableDemo, stream.Readable); function flowingReadableDemo(opt) { stream.Readable.call(this, opt); this.quotes = ["yessdasdsa", "noasdasdas", "maybe"]; this._index = 0; } flowingReadableDemo.prototype._read = function() { if (this._index >= this.quotes.length) { this.push(null); } else { this.push(this.quotes[this._index]); this._index += 1; } }; let r = new flowingReadableDemo(); r.on("data", function(data) { console.log("Callback read: " + data.toString()); // flowing狀態(tài)下,我們無(wú)需執(zhí)行read,僅需要設(shè)置data事件處理函數(shù)或者設(shè)定導(dǎo)流目標(biāo)pipe }); r.on("end", function(data) { console.log("No more answers."); });
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/87390.html
摘要:內(nèi)部架構(gòu)上圖表示一個(gè)實(shí)例的組成部分部分緩沖數(shù)組內(nèi)部函數(shù)部分緩沖鏈表內(nèi)部函數(shù)實(shí)例必須實(shí)現(xiàn)的內(nèi)部函數(shù)以及系統(tǒng)提供的回調(diào)函數(shù)。有三個(gè)參數(shù),第一個(gè)為待處理的數(shù)據(jù),第二個(gè)為編碼,第三個(gè)為回調(diào)函數(shù)。 Transform流特性 在開(kāi)發(fā)中直接接觸Transform流的情況不是很多,往往是使用相對(duì)成熟的模塊或者封裝的API來(lái)完成流的處理,最為特殊的莫過(guò)于through2模塊和gulp流操作。那么,Tra...
摘要:回調(diào)函數(shù)中檢測(cè)該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ?。監(jiān)聽(tīng)源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。 在Node.js中,流(Stream)是其眾多原生對(duì)象的基類,它對(duì)處理潛在的大文件提供了支持,也抽象了一些場(chǎng)景下的數(shù)據(jù)處理和傳遞。在它對(duì)外暴露的接口中,最為神奇的,莫過(guò)于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...
摘要:是消費(fèi)數(shù)據(jù)的,從中獲取數(shù)據(jù),然后對(duì)得到的塊數(shù)據(jù)進(jìn)行處理,至于如何處理,就依賴于具體實(shí)現(xiàn)也就是的實(shí)現(xiàn)。也可以說(shuō)是建立在的基礎(chǔ)上。 1. 認(rèn)識(shí)Stream Stream的概念最早來(lái)源于Unix系統(tǒng),其可以將一個(gè)大型系統(tǒng)拆分成一些小的組件,然后將這些小的組件可以很好地運(yùn)行 TCP/IP協(xié)議中的TCP協(xié)議也用到了Stream的思想,進(jìn)而可以進(jìn)行流量控制、差錯(cuò)控制 在unix中通過(guò) |來(lái)表示流...
摘要:方法也可以接收一個(gè)參數(shù)表示數(shù)據(jù)請(qǐng)求著請(qǐng)求的數(shù)據(jù)大小,但是可讀流可以根據(jù)需要忽略這個(gè)參數(shù)。讀取數(shù)據(jù)大部分情況下我們只要簡(jiǎn)單的使用方法將可讀流的數(shù)據(jù)重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數(shù)據(jù)更有用。 介紹本文介紹了使用 node.js streams 開(kāi)發(fā)程序的基本方法。 We should have some ways of connecting programs ...
摘要:事件的觸發(fā)頻次同樣是由實(shí)現(xiàn)者決定,譬如在進(jìn)行文件讀取時(shí),可能每行都會(huì)觸發(fā)一次而在請(qǐng)求處理時(shí),可能數(shù)的數(shù)據(jù)才會(huì)觸發(fā)一次。如果有參數(shù)傳入,它會(huì)讓可讀流停止流向某個(gè)特定的目的地,否則,它會(huì)移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文節(jié)選自 Node.js Chea...
閱讀 4723·2021-10-25 09:48
閱讀 3293·2021-09-07 09:59
閱讀 2353·2021-09-06 15:01
閱讀 2799·2021-09-02 15:21
閱讀 2775·2019-08-30 14:14
閱讀 2260·2019-08-29 13:59
閱讀 2580·2019-08-29 11:02
閱讀 2592·2019-08-26 13:33