摘要:等文件一旦打開,立刻執(zhí)行寫入操作發(fā)射一個緩存區(qū)清空的事件自定義可寫流為了實現(xiàn)可寫流,我們需要使用流模塊中的構(gòu)造函數(shù)。
流的基本概念及理解
流是一種數(shù)據(jù)傳輸手段,是有順序的,有起點和終點,比如你要把數(shù)據(jù)從一個地方傳到另外一個地方
流非常重要,gulp,webpack,HTTP里的請求和響應(yīng),http里的socket都是流,包括后面壓縮,加密等流為什么這么好用還這么重要呢?
因為有時候我們不關(guān)心文件的主體內(nèi)容,只關(guān)心能不能取到數(shù)據(jù),取到數(shù)據(jù)之后怎么進(jìn)行處理
對于小型的文本文件,我們可以把文件內(nèi)容全部讀入內(nèi)存,然后再寫入文件,比如grunt-file-copy
對于體積較大的二進(jìn)制文件,比如音頻、視頻文件,動輒幾個GB大小,如果使用這種方法,很容易使內(nèi)存“爆倉”。
理想的方法應(yīng)該是讀一部分,寫一部分,不管文件有多大,只要時間允許,總會處理完成,這里就需要用到流的概念
流是一個抽象接口,被Node中很多對象所實現(xiàn),比如HTTP服務(wù)器request和response對象都是流Node.js 中有四種基本的流類型:
Readable - 可讀的流 (例如 fs.createReadStream()).
Writable - 可寫的流 (例如 fs.createWriteStream()).
Duplex - 可讀寫的流 (例如 net.Socket).
Transform - 在讀寫過程中可以修改和變換數(shù)據(jù)的 Duplex 流 (例如 zlib.createDeflate()).
可以通過 require("stream") 加載 Stream 基類。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基類Readable streams可讀流
可讀流(Readable streams)是對提供數(shù)據(jù)的 源頭(source)的抽象
可讀流的例子包括:
HTTP responses, on the client :客戶端請求
HTTP requests, on the server :服務(wù)端請求
fs read streams :讀文件
zlib streams :壓縮
crypto streams :加密
TCP sockets :TCP協(xié)議
child process stdout and stderr :子進(jìn)程標(biāo)準(zhǔn)輸出和錯誤輸出
process.stdin :標(biāo)準(zhǔn)輸入
所有的 Readable 都實現(xiàn)了 stream.Readable 類定義的接口通過流讀取數(shù)據(jù)
用Readable創(chuàng)建對象readable后,便得到了一個可讀流
如果實現(xiàn)_read方法,就將流連接到一個底層數(shù)據(jù)源
流通過調(diào)用_read向底層請求數(shù)據(jù),底層再調(diào)用流的push方法將需要的數(shù)據(jù)傳遞過來
當(dāng)readable連接了數(shù)據(jù)源后,下游便可以調(diào)用readable.read(n)向流請求數(shù)據(jù),同時監(jiān)聽readable的data事件來接收取到的數(shù)據(jù)
下面簡單舉個可讀流的例子:
監(jiān)聽可讀流的data事件,當(dāng)你一旦開始監(jiān)聽data事件的時候,流就可以讀文件的內(nèi)容并且發(fā)射data,讀一點發(fā)射一點讀一點發(fā)射一點
默認(rèn)情況下,當(dāng)你監(jiān)聽data事件之后,會不停的讀數(shù)據(jù),然后觸發(fā)data事件,觸發(fā)完data事件后再次讀數(shù)據(jù)
讀的時候不是把文件整體內(nèi)容讀出來再發(fā)射出來的,而且設(shè)置一個緩沖區(qū),大小默認(rèn)是64K,比如文件是128K,先讀64K發(fā)射出來,再讀64K在發(fā)射出來,會發(fā)射兩次
緩沖區(qū)的大小可以通過highWaterMark來設(shè)置
let fs = require("fs"); //通過創(chuàng)建一個可讀流 let rs = fs.createReadStream("./1.txt",{ flags:"r",//我們要對文件進(jìn)行何種操作 mode:0o666,//權(quán)限位 encoding:"utf8",//不傳默認(rèn)為buffer,顯示為字符串 start:3,//從索引為3的位置開始讀 //這是我的見過唯一一個包括結(jié)束索引的 end:8,//讀到索引為8結(jié)束 highWaterMark:3//緩沖區(qū)大小 }); rs.on("open",function () { console.log("文件打開"); }); rs.setEncoding("utf8");//顯示為字符串 //希望流有一個暫停和恢復(fù)觸發(fā)的機制 rs.on("data",function (data) { console.log(data); rs.pause();//暫停讀取和發(fā)射data事件 setTimeout(function(){ rs.resume();//恢復(fù)讀取并觸發(fā)data事件 },2000); }); //如果讀取文件出錯了,會觸發(fā)error事件 rs.on("error",function () { console.log("error"); }); //如果文件的內(nèi)容讀完了,會觸發(fā)end事件 rs.on("end",function () { console.log("讀完了"); }); rs.on("close",function () { console.log("文件關(guān)閉"); }); /** 文件打開 334 455 讀完了 文件關(guān)閉 **/可讀流的簡單實現(xiàn)
let fs = require("fs"); let ReadStream = require("./ReadStream"); let rs = ReadStream("./1.txt", { flags: "r", encoding: "utf8", start: 3, end: 7, highWaterMark: 3 }); rs.on("open", function () { console.log("open"); }); rs.on("data", function (data) { console.log(data); }); rs.on("end", function () { console.log("end"); }); rs.on("close", function () { console.log("close"); }); /** open 456 789 end close **/
let fs = require("fs"); let EventEmitter = require("events"); class ReadStream extends EventEmitter { constructor(path, options) { super(path, options); this.path = path; this.highWaterMark = options.highWaterMark || 64 * 1024; this.buffer = Buffer.alloc(this.highWaterMark); this.flags = options.flags || "r"; this.encoding = options.encoding; this.mode = options.mode || 0o666; this.start = options.start || 0; this.end = options.end; this.pos = this.start; this.autoClose = options.autoClose || true; this.bytesRead = 0; this.closed = false; this.flowing; this.needReadable = false; this.length = 0; this.buffers = []; this.on("end", function () { if (this.autoClose) { this.destroy(); } }); this.on("newListener", (type) => { if (type == "data") { this.flowing = true; this.read(); } if (type == "readable") { this.read(0); } }); this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { if (this.autoClose) { this.destroy(); return this.emit("error", err); } } this.fd = fd; this.emit("open"); }); } read(n) { if (typeof this.fd != "number") { return this.once("open", () => this.read()); } n = parseInt(n, 10); if (n != n) { n = this.length; } if (this.length == 0) this.needReadable = true; let ret; if (0 < n < this.length) { ret = Buffer.alloc(n); let b; let index = 0; while (null != (b = this.buffers.shift())) { for (let i = 0; i < b.length; i++) { ret[index++] = b[i]; if (index == ret.length) { this.length -= n; b = b.slice(i + 1); this.buffers.unshift(b); break; } } } if (this.encoding) ret = ret.toString(this.encoding); } let _read = () => { let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark; fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => { if (err) { return } let data; if (bytesRead > 0) { data = this.buffer.slice(0, bytesRead); this.pos += bytesRead; this.length += bytesRead; if (this.end && this.pos > this.end) { if (this.needReadable) { this.emit("readable"); } this.emit("end"); } else { this.buffers.push(data); if (this.needReadable) { this.emit("readable"); this.needReadable = false; } } } else { if (this.needReadable) { this.emit("readable"); } return this.emit("end"); } }) } if (this.length == 0 || (this.length < this.highWaterMark)) { _read(0); } return ret; } destroy() { fs.close(this.fd, (err) => { this.emit("close"); }); } pause() { this.flowing = false; } resume() { this.flowing = true; this.read(); } pipe(dest) { this.on("data", (data) => { let flag = dest.write(data); if (!flag) this.pause(); }); dest.on("drain", () => { this.resume(); }); this.on("end", () => { dest.end(); }); } } module.exports = ReadStream;自定義可讀流
為了實現(xiàn)可讀流,引用Readable接口并用它構(gòu)造新對象
我們可以直接把供使用的數(shù)據(jù)push出去。
當(dāng)push一個null對象就意味著我們想發(fā)出信號——這個流沒有更多數(shù)據(jù)了
var stream = require("stream"); var util = require("util"); util.inherits(Counter, stream.Readable); function Counter(options) { stream.Readable.call(this, options); this._index = 0; } Counter.prototype._read = function() { if(this._index++<3){ this.push(this._index+""); }else{ this.push(null); } }; var counter = new Counter(); counter.on("data", function(data){ console.log("讀到數(shù)據(jù): " + data.toString());//no maybe }); counter.on("end", function(data){ console.log("讀完了"); });可讀流的兩種模式
Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式?jīng)Q定了chunk數(shù)據(jù)流動的方式---自動流動還是手工流動。那如何觸發(fā)這兩種模式呢:
flowing mode: 注冊事件data、調(diào)用resume方法、調(diào)用pipe方法
paused mode: 調(diào)用pause方法(沒有pipe方法)、移除data事件 && unpipe所有pipe
如果 Readable 切換到 flowing 模式,且沒有消費者處理流中的數(shù)據(jù),這些數(shù)據(jù)將會丟失。 比如, 調(diào)用了 readable.resume() 方法卻沒有監(jiān)聽 "data" 事件,或是取消了 "data" 事件監(jiān)聽,就有可能出現(xiàn)這種情況
可讀流的三種狀態(tài)
在任意時刻,任意可讀流應(yīng)確切處于下面三種狀態(tài)之一:
readable._readableState.flowing = null
readable._readableState.flowing = false
readable._readableState.flowing = true
兩種模式取決于可讀流flowing狀態(tài):
若為true : flowing mode;
若為false : paused mode
flowing mode
通過注冊data、pipe、resume可以自動獲取所需要的數(shù)據(jù),我們來看下源碼的實現(xiàn)
// data事件觸發(fā)flowing mode if (ev === "data") { // Start flowing on next tick if stream isn"t explicitly paused if (this._readableState.flowing !== false) this.resume(); } else if (ev === "readable") { const state = this._readableState; if (!state.endEmitted && !state.readableListening) { state.readableListening = state.needReadable = true; state.emittedReadable = false; if (!state.reading) { process.nextTick(nReadingNextTick, this); } else if (state.length) { emitReadable(this); } } } // resume觸發(fā)flowing mode Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug("resume"); state.flowing = true; resume(this, state); } return this; } // pipe方法觸發(fā)flowing模式 Readable.prototype.resume = function() { if (!state.flowing) { this.resume() } }
flowing mode的三種方法最后均是通過resume方法,將狀態(tài)變?yōu)閠rue:state.flowing = true
paused mode
在paused mode下,需要手動地讀取數(shù)據(jù),并且可以直接指定讀取數(shù)據(jù)的長度
可以通過監(jiān)聽事件readable,觸發(fā)時手工讀取chunk數(shù)據(jù):
當(dāng)你監(jiān)聽 readable事件的時候,會進(jìn)入暫停模式
當(dāng)監(jiān)聽readable事件的時候,可讀流會馬上去向底層讀取文件,然后把讀到文件的文件放在緩存區(qū)里const state = this._readableState;
self.read(0); 只填充緩存,但是并不會發(fā)射data事件,但是會發(fā)射stream.emit("readable");事件
this._read(state.highWaterMark); 每次調(diào)用底層的方法讀取的時候是讀取3個字節(jié)
let fs = require("fs"); let rs = fs.createReadStream("./1.txt",{ highWaterMark:3 }); rs.on("readable",function(){ console.log(rs._readableState.length); //read如果不加參數(shù)表示讀取整個緩存區(qū)數(shù)據(jù) //讀取一個字段,如果可讀流發(fā)現(xiàn)你要讀的字節(jié)小于等于緩存字節(jié)大小,則直接返回 let chunk = rs.read(1); console.log(chunk); console.log(rs._readableState.length); //當(dāng)你讀完指定的字節(jié)后,如果可讀流發(fā)現(xiàn)剩下的字節(jié)已經(jīng)比最高水位線小了。則會立馬再次讀取填滿 最高水位線 setTimeout(function(){ console.log(rs._readableState.length); },200) });
注意:一旦注冊了readable事件,必須手工讀取read數(shù)據(jù),否則數(shù)據(jù)就會流失,我們來看下源碼的實現(xiàn)
function emitReadable(stream) { var state = stream._readableState; state.needReadable = false; if (!state.emittedReadable) { debug("emitReadable", state.flowing); state.emittedReadable = true; process.nextTick(emitReadable_, stream); } } function emitReadable_(stream) { var state = stream._readableState; debug("emit readable"); if (!state.destroyed && (state.length || state.ended)) { stream.emit("readable"); } state.needReadable = !state.flowing && !state.ended; flow(stream); } function flow(stream) { const state = stream._readableState; debug("flow", state.flowing); while (state.flowing && stream.read() !== null); } function endReadable(stream) { var state = stream._readableState; debug("endReadable", state.endEmitted); if (!state.endEmitted) { state.ended = true; process.nextTick(endReadableNT, state, stream); } } Readable.prototype.read = function(n) { debug("read", n); n = parseInt(n, 10); var state = this._readableState; var nOrig = n; if (n !== 0) state.emittedReadable = false; if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { debug("read: emitReadable", state.length, state.ended); if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this); return null; } n = howMuchToRead(n, state); if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; }
flow方法直接read數(shù)據(jù),將得到的數(shù)據(jù)通過事件data交付出去,然而此處沒有注冊data事件監(jiān)控,因此,得到的chunk數(shù)據(jù)并沒有交付給任何對象,這樣數(shù)據(jù)就白白流失了,所以在觸發(fā)emit("readable")時,需要提前read數(shù)據(jù)Writable streams可寫流
可寫流是對數(shù)據(jù)寫入"目的地"的一種抽象
Writable:可寫流的例子包括了:
HTTP requests, on the client 客戶端請求
HTTP responses, on the server 服務(wù)器響應(yīng)
fs write streams 文件
zlib streams 壓縮
crypto streams 加密
TCP sockets TCP服務(wù)器
child process stdin 子進(jìn)程標(biāo)準(zhǔn)輸入
process.stdout, process.stderr 標(biāo)準(zhǔn)輸出,錯誤輸出
下面舉個可寫流的簡單例子
當(dāng)你往可寫流里寫數(shù)據(jù)的時候,不是會立刻寫入文件的,而是會很寫入緩存區(qū),緩存區(qū)的大小就是highWaterMark,默認(rèn)值是16K。然后等緩存區(qū)滿了之后再次真正的寫入文件里
let fs = require("fs"); let ws = fs.createWriteStream("./2.txt",{ flags:"w", mode:0o666, start:3, highWaterMark:3//默認(rèn)是16K });
如果緩存區(qū)已滿 ,返回false,如果緩存區(qū)未滿,返回true
如果能接著寫,返回true,如果不能接著寫,返回false
按理說如果返回了false,就不能再往里面寫了,但是如果你真寫了,如果也不會丟失,會緩存在內(nèi)存里。等緩存區(qū)清空之后再從內(nèi)存里讀出來
let flag = ws.write("1"); console.log(flag);//true flag =ws.write("2"); console.log(flag);//true flag =ws.write("3"); console.log(flag);//false flag =ws.write("4"); console.log(flag);//false
"drain" 事件
如果調(diào)用 stream.write(chunk) 方法返回 false,流將在適當(dāng)?shù)臅r機觸發(fā) "drain" 事件,這時才可以繼續(xù)向流中寫入數(shù)據(jù)當(dāng)一個流不處在 drain 的狀態(tài), 對 write() 的調(diào)用會緩存數(shù)據(jù)塊, 并且返回 false。 一旦所有當(dāng)前所有緩存的數(shù)據(jù)塊都排空了(被操作系統(tǒng)接受來進(jìn)行輸出), 那么 "drain" 事件就會被觸發(fā)
建議, 一旦 write() 返回 false, 在 "drain" 事件觸發(fā)前, 不能寫入任何數(shù)據(jù)塊
舉個簡單的例子說明一下:
let fs = require("fs"); let ws = fs.createWriteStream("2.txt",{ flags:"w", mode:0o666, start:0, highWaterMark:3 }); let count = 9; function write(){ let flag = true;//緩存區(qū)未滿 //寫入方法是同步的,但是寫入文件的過程是異步的。在真正寫入文件后還會執(zhí)行我們的回調(diào)函數(shù) while(flag && count>0){ console.log("before",count); flag = ws.write((count)+"","utf8",(function (i) { return ()=>console.log("after",i); })(count)); count--; } } write();//987 //監(jiān)聽緩存區(qū)清空事件 ws.on("drain",function () { console.log("drain"); write();//654 321 }); ws.on("error",function (err) { console.log(err); }); /** before 9 before 8 before 7 after 9 after 8 after 7 **/
如果已經(jīng)不再需要寫入了,可以調(diào)用end方法關(guān)閉寫入流,一旦調(diào)用end方法之后則不能再寫入
比如在ws.end();后寫ws.write("x");,會報錯write after end
"pipe"事件
linux精典的管道的概念,前者的輸出是后者的輸入pipe是一種最簡單直接的方法連接兩個stream,內(nèi)部實現(xiàn)了數(shù)據(jù)傳遞的整個過程,在開發(fā)的時候不需要關(guān)注內(nèi)部數(shù)據(jù)的流動
這個方法從可讀流拉取所有數(shù)據(jù), 并將數(shù)據(jù)寫入到提供的目標(biāo)中
自動管理流量,將數(shù)據(jù)的滯留量限制到一個可接受的水平,以使得不同速度的來源和目標(biāo)不會淹沒可用內(nèi)存
默認(rèn)情況下,當(dāng)源數(shù)據(jù)流觸發(fā) end的時候調(diào)用end(),所以寫入數(shù)據(jù)的目標(biāo)不可再寫。傳 { end:false }作為options,可以保持目標(biāo)流打開狀態(tài)
pipe方法的原理
var fs = require("fs"); var ws = fs.createWriteStream("./2.txt"); var rs = fs.createReadStream("./1.txt"); rs.on("data", function (data) { var flag = ws.write(data); if(!flag) rs.pause(); }); ws.on("drain", function () { rs.resume(); }); rs.on("end", function () { ws.end(); });
下面舉個簡單的例子說明一下pipe的用法:
let fs = require("fs"); let rs = fs.createReadStream("./1.txt",{ highWaterMark:3 }); let ws = fs.createWriteStream("./2.txt",{ highWaterMark:3 }); rs.pipe(ws); //移除目標(biāo)可寫流 rs.unpipe(ws);
當(dāng)監(jiān)聽可讀流data事件的時候會觸發(fā)回調(diào)函數(shù)的執(zhí)行
可以實現(xiàn)數(shù)據(jù)的生產(chǎn)者和消費者速度的均衡
rs.on("data",function (data) { console.log(data); let flag = ws.write(data); if(!flag){ rs.pause(); } });
監(jiān)聽可寫流緩存區(qū)清空事件,當(dāng)所有要寫入的數(shù)據(jù)寫入完成后,接著恢復(fù)從可讀流里讀取并觸發(fā)data事件
ws.on("drain",function () { console.log("drain"); rs.resume(); });
unpipe
readable.unpipe()方法將之前通過stream.pipe()方法綁定的流分離
如果寫入的目標(biāo)沒有傳入, 則所有綁定的流都會被分離
如果指定了寫入的目標(biāo),但是沒有綁定流,則什么事情都不會發(fā)生
簡單距離說明下unpipe的用法:
let fs = require("fs"); var from = fs.createReadStream("./1.txt"); var to = fs.createWriteStream("./2.txt"); from.pipe(to); setTimeout(() => { console.log("關(guān)閉向2.txt的寫入"); from.unpipe(writable); console.log("手工關(guān)閉文件流"); to.end(); }, 1000);pipe的簡單實現(xiàn)
let fs = require("fs"); let ReadStream = require("./ReadStream"); let rs = ReadStream("./1.txt", { flags: "r", encoding: "utf8", highWaterMark: 3 }); let FileWriteStream = require("./WriteStream"); let ws = FileWriteStream("./2.txt",{ flags:"w", encoding:"utf8", highWaterMark:3 }); rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) { this.on("data", (data)=>{ let flag = dest.write(data); if(!flag){ this.pause(); } }); dest.on("drain", ()=>{ this.resume(); }); this.on("end", ()=>{ dest.end(); }); } ReadStream.prototype.pause = function(){ this.flowing = false; } ReadStream.prototype.resume = function(){ this.flowing = true; this.read(); }自定義管道流
const stream = require("stream") var index = 0; const readable = stream.Readable({ highWaterMark: 2, read: function () { process.nextTick(() => { console.log("push", ++index) this.push(index+""); }) } }) const writable = stream.Writable({ highWaterMark: 2, write: function (chunk, encoding, next) { console.log("寫入:", chunk.toString()) } }) readable.pipe(writable);可寫流的簡單實現(xiàn)
let fs = require("fs"); let FileWriteStream = require("./FileWriteStream"); let ws = FileWriteStream("./2.txt",{ flags:"w", encoding:"utf8", highWaterMark:3 }); let i = 10; function write(){ let flag = true; while(i&&flag){ flag = ws.write("1","utf8",(function(i){ return function(){ console.log(i); } })(i)); i--; console.log(flag); } } write(); ws.on("drain",()=>{ console.log("drain"); write(); }); /** 10 9 8 drain 7 6 5 drain 4 3 2 drain 1 **/
let EventEmitter = require("events"); let util = require("util"); let fs = require("fs"); util.inherits(WriteStream, EventEmitter); function WriteStream(path, options) { EventEmitter.call(this); if (!(this instanceof WriteStream)) { return new WriteStream(path, options); } this.path = path; this.fd = options.fd; this.encoding = options.encoding||"utf8"; this.flags = options.flags || "w"; this.mode = options.mode || 0o666; this.autoClose = options.autoClose || true; this.start = options.start || 0; this.pos = this.start;//開始寫入的索引位置 this.open();//打開文件進(jìn)行操作 this.writing = false;//沒有在寫入過程 中 this.buffers = []; this.highWaterMark = options.highWaterMark||16*1024; //如果監(jiān)聽到end事件,而且要求自動關(guān)閉的話則關(guān)閉文件 this.on("end", function () { if (this.autoClose) { this.destroy() } }); } WriteStream.prototype.close = function(){ fs.close(this.fd,(err)=>{ if(err) this.emit("error",err); }); } WriteStream.prototype.open = function () { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) return this.emit("error", err); this.fd = fd;//把文件描述符賦給當(dāng)前實例的fd屬性 //發(fā)射open事件 this.emit("open", fd); }); } /** * 會判斷當(dāng)前是后臺是否在寫入過程中,如果在寫入過程中,則把這個數(shù)據(jù)放在待處理的緩存中,如果不在寫入過程中,可以直接寫。 */ WriteStream.prototype.write = function (chunk, encoding, cb) { chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding); //先把數(shù)據(jù)放在緩存里 this.buffers.push({ chunk, encoding, cb }); let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark; //只有當(dāng)緩存區(qū)寫滿了,那么清空緩存區(qū)的時候才會發(fā)射drain事件,否則 不發(fā)放 this.needDrain = isFull; //如果說文件還沒有打開,則把寫入的方法壓入open事件的監(jiān)聽函數(shù)。等文件一旦打開,立刻執(zhí)行寫入操作 if (typeof this.fd !== "number") { this.once("open", () => { this._write(); }); return !isFull; }else{ if(!this.writing){ setImmediate(()=>{ this._write(); this.writing = true; }); } return !isFull; } } WriteStream.prototype._write = function () { let part = this.buffers.shift(); if (part) { fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{ if(err)return this.emit("error",err); part.cb && part.cb(); this._write(); }); }else{ //發(fā)射一個緩存區(qū)清空的事件 this.emit("drain"); this.writing = false; } } module.exports = WriteStream;自定義可寫流
為了實現(xiàn)可寫流,我們需要使用流模塊中的Writable構(gòu)造函數(shù)。 我們只需給Writable構(gòu)造函數(shù)傳遞一些選項并創(chuàng)建一個對象。唯一需要的選項是write函數(shù),該函數(shù)揭露數(shù)據(jù)塊要往哪里寫
chunk通常是一個buffer,除非我們配置不同的流。
encoding是在特定情況下需要的參數(shù),通常我們可以忽略它。
callback是在完成處理數(shù)據(jù)塊后需要調(diào)用的函數(shù)。這是寫數(shù)據(jù)成功與否的標(biāo)志。若要發(fā)出故障信號,請用錯誤對象調(diào)用回調(diào)函數(shù)
var stream = require("stream"); var util = require("util"); util.inherits(Writer, stream.Writable); let stock = []; function Writer(opt) { stream.Writable.call(this, opt); } Writer.prototype._write = function(chunk, encoding, callback) { setTimeout(()=>{ stock.push(chunk.toString("utf8")); console.log("增加: " + chunk); callback(); },500) }; var w = new Writer(); for (var i=1; i<=5; i++){ w.write("項目:" + i, "utf8"); } w.end("結(jié)束寫入",function(){ console.log(stock); });Duplex streams可讀寫的流(雙工流)
Duplex 流是同時實現(xiàn)了 Readable 和 Writable 接口的流
雙工流的可讀性和可寫性操作完全獨立于彼此,這僅僅是將兩個特性組合成一個對象Duplex 流的實例包括了:
TCP sockets
zlib streams
crypto streams
下面簡單實現(xiàn)雙工流:
const {Duplex} = require("stream"); const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }, read(size) { this.push((++this.index)+""); if (this.index > 3) { this.push(null); } } }); inoutStream.index = 0; process.stdin.pipe(inoutStream).pipe(process.stdout);Transform streams轉(zhuǎn)換流
變換流(Transform streams) 是一種 Duplex 流。它的輸出與輸入是通過某種方式關(guān)聯(lián)的。和所有 Duplex 流一樣,變換流同時實現(xiàn)了 Readable 和 Writable 接口轉(zhuǎn)換流的輸出是從輸入中計算出來的
對于轉(zhuǎn)換流,我們不必實現(xiàn)read或write的方法,我們只需要實現(xiàn)一個transform方法,將兩者結(jié)合起來。它有write方法的意思,我們也可以用它來push數(shù)據(jù)變換流的實例包括:
zlib streams
crypto streams
下面簡單實現(xiàn)轉(zhuǎn)換流:
const {Transform} = require("stream"); const upperCase = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); } }); process.stdin.pipe(upperCase).pipe(process.stdout);對象流
默認(rèn)情況下,流處理的數(shù)據(jù)是Buffer/String類型的值。有一個objectMode標(biāo)志,我們可以設(shè)置它讓流可以接受任何JavaScript對象
const {Transform} = require("stream"); let fs = require("fs"); let rs = fs.createReadStream("./users.json"); rs.setEncoding("utf8"); let toJson = Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.parse(chunk)); callback(); } }); let jsonOut = Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { console.log(chunk); callback(); } }); rs.pipe(toJson).pipe(jsonOut);
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/107224.html
摘要:事件的觸發(fā)頻次同樣是由實現(xiàn)者決定,譬如在進(jìn)行文件讀取時,可能每行都會觸發(fā)一次而在請求處理時,可能數(shù)的數(shù)據(jù)才會觸發(fā)一次。如果有參數(shù)傳入,它會讓可讀流停止流向某個特定的目的地,否則,它會移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文節(jié)選自 Node.js Chea...
摘要:流是基于事件的用于管理和處理數(shù)據(jù)而且有不錯的效率借助事件和非阻塞庫流模塊允許在其可用的時候動態(tài)處理在其不需要的時候釋放掉使用流的好處舉一個讀取文件的例子使用同步讀取一個文件程序會被阻塞所有的數(shù)據(jù)都會被讀取到內(nèi)存中換用讀取文件程序不會被阻塞但 流是基于事件的API,用于管理和處理數(shù)據(jù),而且有不錯的效率.借助事件和非阻塞I/O庫,流模塊允許在其可用的時候動態(tài)處理,在其不需要的時候釋放掉. ...
摘要:當(dāng)接收一個回調(diào)函數(shù)的時候,一定要注意回調(diào)函數(shù)中的參數(shù)。主要作用就是用來讀取文件或者文件夾中的數(shù)據(jù)。表示文件的名稱指的是發(fā)生的變化使用技巧的進(jìn)一步使用,可以參照中文官網(wǎng)中的技巧集。 Gulp 簡介 Gulp 對現(xiàn)在的前端而言,是一個稍微老舊的工具了,但是,為了復(fù)習(xí)以前學(xué)過的內(nèi)容,還是把它翻出來,放在自己的博客中。說不定哪天又用到了呢。 需要說明的是,這里使用的 Gulp 版本是 3.9....
摘要:事件的監(jiān)聽與事件的觸發(fā)事件一事件機制的實現(xiàn)中大部分的模塊,都繼承自模塊。從另一個角度來看,事件偵聽器模式也是一種事件鉤子的機制,利用事件鉤子導(dǎo)出內(nèi)部數(shù)據(jù)或狀態(tài)給外部調(diào)用者。的核心就是事件發(fā)射與事件監(jiān)聽器功能的封裝。 nodejs事件的監(jiān)聽與事件的觸發(fā) nodejs事件(Events)showImg(https://segmentfault.com/img/bV0Sqi?w=692&h=...
摘要:回調(diào)函數(shù)中檢測該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ?。監(jiān)聽源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。 在Node.js中,流(Stream)是其眾多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數(shù)據(jù)處理和傳遞。在它對外暴露的接口中,最為神奇的,莫過于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...
閱讀 1335·2021-11-17 09:33
閱讀 3762·2021-09-28 09:42
閱讀 3550·2021-09-13 10:35
閱讀 2816·2021-09-06 15:00
閱讀 2604·2021-08-27 13:12
閱讀 3739·2021-07-26 23:38
閱讀 2113·2019-08-30 15:55
閱讀 682·2019-08-30 15:53