亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

node那點(diǎn)事(二) -- Writable streams(可寫流)、自定義流

mtunique / 623人閱讀

摘要:可寫流可寫流是對(duì)數(shù)據(jù)寫入目的地的一種抽象。對(duì)象流的特點(diǎn)就是它有一個(gè)標(biāo)志,我們可以設(shè)置它讓流可以接受任何對(duì)象。

可寫流(Writable Stream)

可寫流是對(duì)數(shù)據(jù)寫入"目的地"的一種抽象。

可寫流的原理其實(shí)與可讀流類似,當(dāng)數(shù)據(jù)過(guò)來(lái)的時(shí)候會(huì)寫入緩存池,當(dāng)寫入的速度很慢或者寫入暫停時(shí)候,數(shù)據(jù)流便會(huì)進(jìn)入到隊(duì)列池緩存起來(lái),當(dāng)然即使緩存池滿了,剩余的數(shù)據(jù)也是存在內(nèi)存

可寫流的簡(jiǎn)單用法如下代碼

let fs = require("fs");
let path = require("path");
let ws = fs.createWriteStream(path.join(__dirname,"1.txt"),{
    highWaterMark:3,
    autoClose:true,
    flags:"w",
    encoding:"utf8",
    mode:0o666,
    start:0,
}); 
let i = 9;
function write(){
    let flag = true;
    while(i>0&&flag){
        flag = ws.write(--i+"","utf8",()=>{console.log("ok")});
        console.log(flag)
    }
}
write();
// drain只有當(dāng)緩存區(qū)充滿后 并且被消費(fèi)后觸發(fā)
ws.on("drain",function(){
    console.log("抽干")
    write();
});
實(shí)現(xiàn)原理

現(xiàn)在就讓我們來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的可寫流,來(lái)研究可寫流的內(nèi)部原理,可寫流有很多方法與可讀流類似,這里不在重復(fù)了首先要有一個(gè)構(gòu)造函數(shù)來(lái)定義一些基本選項(xiàng)屬性,然后調(diào)用一個(gè)open放法打開文件,并且有一個(gè)destroy方法來(lái)處理關(guān)閉邏輯

let EventEmitter = require("events");
let fs = require("fs");

class WriteStream extends EventEmitter {
    constructor(path,options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.autoClose = options.autoClose || true;
        this.mode = options.mode;
        this.start = options.start || 0;
        this.flags = options.flags || "w";
        this.encoding = options.encoding || "utf8";

        // 可寫流 要有一個(gè)緩存區(qū),當(dāng)正在寫入文件是,內(nèi)容要寫入到緩存區(qū)中
        // 在源碼中是一個(gè)鏈表 => []
        this.buffers = [];

        // 標(biāo)識(shí) 是否正在寫入
        this.writing = false;

        // 是否滿足觸發(fā)drain事件
        this.needDrain = false;

        // 記錄寫入的位置
        this.pos = 0;

        // 記錄緩存區(qū)的大小
        this.length = 0;
        this.open();
    }
    
    destroy() {
        if (typeof this.fd !== "number") {
            return this.emit("close");
        }
        fs.close(this.fd, () => {
            this.emit("close")
        });
    }
    
    open() {
        fs.open(this.path, this.flags, this.mode, (err,fd) => {
            if (err) {
                this.emit("error", err);
                if (this.autoClose) {
                    this.destroy();
                }
                return;
            }
            this.fd = fd;
            this.emit("open");
        })
    }
}

module.exports = WriteStream;

接著我們實(shí)現(xiàn)write方法來(lái)讓可寫流對(duì)象調(diào)用,在write方法中我們首先將數(shù)據(jù)轉(zhuǎn)化為buffer,接著實(shí)現(xiàn)一些事件的觸發(fā)條件的邏輯,如果現(xiàn)在沒(méi)有正在寫入的話我們就要真正的進(jìn)行寫入操作了,這里我們實(shí)現(xiàn)一個(gè)_write方法來(lái)實(shí)現(xiàn)寫入操作,否則則代表文件正在寫入,那我們就將流傳來(lái)的數(shù)據(jù)先放在緩存區(qū)中,保證寫入數(shù)據(jù)不會(huì)同時(shí)進(jìn)行。

write(chunk,encoding=this.encoding,callback=()=>{}){
    chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
    // write 返回一個(gè)boolean類型 
    this.length+=chunk.length; 
    let ret = this.length{
            callback();
            this.clearBuffer();
        }); // 8
    }
    return ret;
}

_write(chunk,encoding,callback){
    if(typeof this.fd !== "number"){
        return this.once("open",()=>this._write(chunk,encoding,callback));
    }
    fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
        this.length -= byteWritten;
        this.pos += byteWritten;
        
        callback(); // 清空緩存區(qū)的內(nèi)容
    });
}
    

_write寫入之后的回調(diào)中我們會(huì)調(diào)用傳入回調(diào)函數(shù)clearBuffer,這個(gè)方法會(huì)去buffers中繼續(xù)遞歸地把數(shù)據(jù)取出,然后繼續(xù)調(diào)用_write方法去寫入,直到全部buffer中的數(shù)據(jù)取出后,這樣就清空了buffers。

clearBuffer(){
    let buffer = this.buffers.shift();
    if(buffer){
        this._write(buffer.chunk,buffer.encoding,()=>{
            buffer.callback();
            this.clearBuffer()
        });
    }else{
        this.writing = false;
        if(this.needDrain){ // 是否需要觸發(fā)drain 需要就發(fā)射drain事件
            this.needDrain = false;
            this.emit("drain");
        }
    }
}

最后附上完整的代碼

let EventEmitter = require("events");
let fs = require("fs");
class WriteStream extends EventEmitter{
    constructor(path,options){
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark||16*1024;
        this.autoClose = options.autoClose||true;
        this.mode = options.mode;
        this.start = options.start||0;
        this.flags = options.flags||"w";
        this.encoding = options.encoding || "utf8";

        // 可寫流 要有一個(gè)緩存區(qū),當(dāng)正在寫入文件是,內(nèi)容要寫入到緩存區(qū)中
        // 在源碼中是一個(gè)鏈表 => []

        this.buffers = [];

        // 標(biāo)識(shí) 是否正在寫入
        this.writing = false;

        // 是否滿足觸發(fā)drain事件
        this.needDrain = false;

        // 記錄寫入的位置
        this.pos = 0;

        // 記錄緩存區(qū)的大小
        this.length = 0;
        this.open();
    }
    destroy(){
        if(typeof this.fd !=="number"){
            return this.emit("close");
        }
        fs.close(this.fd,()=>{
            this.emit("close")
        })
    }
    open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                this.emit("error",err);
                if(this.autoClose){
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit("open");
        })
    }
    write(chunk,encoding=this.encoding,callback=()=>{}){
        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
        // write 返回一個(gè)boolean類型 
        this.length+=chunk.length; 
        let ret = this.length{
                callback();
                this.clearBuffer();
            }); // 8
        }
        return ret;
    }
    clearBuffer(){
        let buffer = this.buffers.shift();
        if(buffer){
            this._write(buffer.chunk,buffer.encoding,()=>{
                buffer.callback();
                this.clearBuffer()
            });
        }else{
            this.writing = false;
            if(this.needDrain){ // 是否需要觸發(fā)drain 需要就發(fā)射drain事件
                this.needDrain = false;
                this.emit("drain");
            }
        }
    }
    _write(chunk,encoding,callback){
        if(typeof this.fd !== "number"){
            return this.once("open",()=>this._write(chunk,encoding,callback));
        }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
            this.length -= byteWritten;
            this.pos += byteWritten;
            
            callback(); // 清空緩存區(qū)的內(nèi)容
        });
    }
}

module.exports = WriteStream;
Pipe管道流

前面我們了解了可讀流與可寫流,那么怎么讓二者結(jié)合起來(lái)使用呢,node給我們提供好了方法--Pipe管道,流顧名思義,就是在可讀流與可寫流中間加入一個(gè)管道,實(shí)現(xiàn)一邊讀取,一邊寫入,讀一點(diǎn)寫一點(diǎn)。

Pipe的使用方法如下

let fs = require("fs");
let path = require("path");
let ReadStream = require("./ReadStream");
let WriteStream = require("./WriteStream");

let rs = new ReadStream(path.join(__dirname, "./1.txt"), {
    highWaterMark: 4
});
let ws = new WriteStream(path.join(__dirname, "./2.txt"), {
    highWaterMark: 1
});
// 4 1
rs.pipe(ws); 
實(shí)現(xiàn)原理

Pipe的原理比較簡(jiǎn)單,簡(jiǎn)單說(shuō)監(jiān)聽可讀流的data事件來(lái)持續(xù)獲取文件中的數(shù)據(jù),然后我們就會(huì)去調(diào)用寫流的write方法。如果可寫流緩存區(qū)已滿,那么當(dāng)我們得到調(diào)用可讀流的pause方法來(lái)暫停讀取,然后等到寫流的緩存區(qū)已經(jīng)全部寫入并且觸發(fā)drain事件時(shí),我們就會(huì)調(diào)用resume重新開啟讀取的流程。上代碼

pipe(ws) {
    this.on("data", (chunk) => {
        let flag = ws.write(chunk);
        if (!flag) {
            this.pause();
        }
    });
    ws.on("drain", () => {
        this.resume();
    })
}
自定義流

Node允許我們自定義流,讀流繼承于Readable接口,寫流則繼承于Writable接口,所以我們其實(shí)是可以自定義一個(gè)流模塊,只要繼承stream模塊對(duì)應(yīng)的接口即可。

自定義可讀流

如果我們要自定義讀流的話,那我們就需要繼承Readable,Readable里面有一個(gè)read()方法,默認(rèn)調(diào)用_read(),所以我們只要復(fù)寫了_read()方法就可實(shí)現(xiàn)讀取的邏輯,同時(shí)Readable中也提供了一個(gè)push方法,調(diào)用push方法就會(huì)觸發(fā)data事件,push中的參數(shù)就是data事件回調(diào)函數(shù)的參數(shù),當(dāng)push傳入的參數(shù)為null的時(shí)候就代表讀流停止,上代碼

let { Readable } = require("stream");

// 想實(shí)現(xiàn)什么流 就繼承這個(gè)流
// Readable里面有一個(gè)read()方法,默認(rèn)掉_read()
// Readable中提供了一個(gè)push方法你調(diào)用push方法就會(huì)觸發(fā)data事件
let index = 9;
class MyRead extends Readable {
    _read() {
        // 可讀流什么時(shí)候停止呢? 當(dāng)push null的時(shí)候停止
        if (index-- > 0) return this.push("123");
        this.push(null);
    }
}

let mr = new MyRead();
mr.on("data", function(data) {
    console.log(data);
});
自定義可寫流

與自定義讀流類似,自定義寫流需要繼承Writable接口,并且實(shí)現(xiàn)一個(gè)_write()方法,這里注意的是_write中可以傳入3個(gè)參數(shù),chunk, encoding, callback,chunk就是代表寫入的數(shù)據(jù),通常是一個(gè)buffer,encoding是編碼類型,通常不會(huì)用到,最后的callback要注意,它并不是我們用這個(gè)自定義寫流調(diào)用write時(shí)的回調(diào),而是我們上面講到寫流實(shí)現(xiàn)時(shí)的clearBuffer函數(shù)。

let { Writable } = require("stream");

// 可寫流實(shí)現(xiàn)_write方法
// 源碼中默認(rèn)調(diào)用的是Writable中的write方法
class MyWrite extends Writable {
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback(); // clearBuffer
    }
}

let mw = new MyWrite();
mw.write("111", "utf8", () => {
    console.log(1);
})
mw.write("222", "utf8", () => {
    console.log(1);
});
Duplex 雙工流

雙工流其實(shí)就是結(jié)合了上面我們說(shuō)的自定義讀流和自定義寫流,它既能讀也能寫,同時(shí)可以做到讀寫之間互不干擾

let { Duplex } =  require("stream");

// 雙工流 又能讀 又能寫,而且讀取可以沒(méi)關(guān)系(互不干擾)
let d = Duplex({
    read() {
        this.push("hello");
        this.push(null);
    },
    write(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});

d.on("data", function(data) {
    console.log(data);
});
d.write("hello");
Transform 轉(zhuǎn)換流

轉(zhuǎn)換流的本質(zhì)就是雙工流,唯一不同的是它并不需要像上面提到的雙工流一樣實(shí)現(xiàn)read和write,它只需要實(shí)現(xiàn)一個(gè)transform方法用于轉(zhuǎn)換

let { Transform } =  require("stream");

// 它的參數(shù)和可寫流一樣
let tranform1 = Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase()); // 將輸入的內(nèi)容放入到可讀流中
        callback();
    }
});
let tranform2 = Transform({
    transform(chunk, encoding, callback){
        console.log(chunk.toString());
        callback();
    }
});

// 等待你的輸入
// rs.pipe(ws);
// 希望將輸入的內(nèi)容轉(zhuǎn)化成大寫在輸出出來(lái)
process.stdin.pipe(tranform1).pipe(tranform2);
// 對(duì)象流 可讀流里只能放buffer或者字符串 對(duì)象流里可以放對(duì)象
對(duì)象流

默認(rèn)情況下,流處理的數(shù)據(jù)是Buffer/String類型的值。對(duì)象流的特點(diǎn)就是它有一個(gè)objectMode標(biāo)志,我們可以設(shè)置它讓流可以接受任何JavaScript對(duì)象。上代碼

const { Transform } = require("stream");

let fs = require("fs");
let rs = fs.createReadStream("./users.json");

rs.setEncoding("utf8");

let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});

let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/94289.html

相關(guān)文章

  • 淺談node.js中的stream()

    摘要:在可讀流事件里我們就必須調(diào)用方法。當(dāng)一個(gè)對(duì)象就意味著我們想發(fā)出信號(hào)這個(gè)流沒(méi)有更多數(shù)據(jù)了自定義可寫流為了實(shí)現(xiàn)可寫流,我們需要使用流模塊中的構(gòu)造函數(shù)。我們只需給構(gòu)造函數(shù)傳遞一些選項(xiàng)并創(chuàng)建一個(gè)對(duì)象。 前言 什么是流呢?看字面意思,我們可能會(huì)想起生活中的水流,電流。但是流不是水也不是電,它只是描述水和電的流動(dòng);所以說(shuō)流是抽象的。在node.js中流是一個(gè)抽象接口,它不關(guān)心文件內(nèi)容,只關(guān)注是否從...

    elliott_hu 評(píng)論0 收藏0
  • 通過(guò)源碼解析 Node.js 中導(dǎo)(pipe)的實(shí)現(xiàn)

    摘要:回調(diào)函數(shù)中檢測(cè)該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ?。監(jiān)聽源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。 在Node.js中,流(Stream)是其眾多原生對(duì)象的基類,它對(duì)處理潛在的大文件提供了支持,也抽象了一些場(chǎng)景下的數(shù)據(jù)處理和傳遞。在它對(duì)外暴露的接口中,最為神奇的,莫過(guò)于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...

    defcon 評(píng)論0 收藏0
  • [譯]關(guān)于Node.js streams你需要知道的一切

    摘要:當(dāng)一個(gè)客戶端的響應(yīng)對(duì)象是一個(gè)可讀流,那么在服務(wù)器端這就是一個(gè)可寫流。的模塊給我們提供了一個(gè)可以操作任何文件的可讀流通過(guò)方法創(chuàng)建。創(chuàng)建一個(gè)可讀流創(chuàng)建可讀流,我們需要類創(chuàng)建一個(gè)可讀流非常簡(jiǎn)單。可以通過(guò)修改可讀流配置里面的方法實(shí)現(xiàn)。 Node.js的stream模塊是有名的應(yīng)用困難,更別說(shuō)理解了。那現(xiàn)在可以告訴你,這些都不是問(wèn)題了。 多年來(lái),開發(fā)人員在那里創(chuàng)建了大量的軟件包,其唯一目的就是使...

    bang590 評(píng)論0 收藏0
  • Node.js 中操作實(shí)踐

    摘要:事件的觸發(fā)頻次同樣是由實(shí)現(xiàn)者決定,譬如在進(jìn)行文件讀取時(shí),可能每行都會(huì)觸發(fā)一次而在請(qǐng)求處理時(shí),可能數(shù)的數(shù)據(jù)才會(huì)觸發(fā)一次。如果有參數(shù)傳入,它會(huì)讓可讀流停止流向某個(gè)特定的目的地,否則,它會(huì)移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文節(jié)選自 Node.js Chea...

    chaos_G 評(píng)論0 收藏0
  • node點(diǎn)事(一) -- Readable streams(可讀

    摘要:流的類型中有四種基本的流類型可讀的流例如可寫的流例如可讀寫的流例如在讀寫過(guò)程中可以修改和變換數(shù)據(jù)的流例如可讀流可讀流有兩種模式流動(dòng)模式可讀流自動(dòng)讀取數(shù)據(jù),通過(guò)接口的事件盡快將數(shù)據(jù)提供給應(yīng)用。 流的簡(jiǎn)介 流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)。 stream 模塊提供了基礎(chǔ)的 API 。使用這些 API 可以很容易地來(lái)構(gòu)建實(shí)...

    rickchen 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<