摘要:在數(shù)據(jù)緩沖區(qū)已超過或?qū)懭腙?duì)列當(dāng)前正忙的任何情況下,將返回。當(dāng)返回值時(shí),背壓系統(tǒng)啟動(dòng),它會(huì)暫停傳入的流發(fā)送任何數(shù)據(jù),并等待消費(fèi)者再次準(zhǔn)備就緒,清空數(shù)據(jù)緩沖區(qū)后,將發(fā)出事件并恢復(fù)傳入的數(shù)據(jù)流。
流中的背壓
在數(shù)據(jù)處理過程中會(huì)出現(xiàn)一個(gè)叫做背壓的常見問題,它描述了數(shù)據(jù)傳輸過程中緩沖區(qū)后面數(shù)據(jù)的累積,當(dāng)傳輸?shù)慕邮斩司哂袕?fù)雜的操作時(shí),或者由于某種原因速度較慢時(shí),來自傳入源的數(shù)據(jù)就有累積的趨勢(shì),就像阻塞一樣。
要解決這個(gè)問題,必須有一個(gè)委托系統(tǒng)來確保數(shù)據(jù)從一個(gè)源到另一個(gè)源的平滑流動(dòng),不同的社區(qū)已經(jīng)針對(duì)他們的程序獨(dú)特地解決了這個(gè)問題,Unix管道和TCP套接字就是很好的例子,并且通常被稱為流量控制,在Node.js中,流是已采用的解決方案。
本指南的目的是進(jìn)一步詳細(xì)說明背壓是什么,以及精確流如何在Node.js的源代碼中解決這個(gè)問題,本指南的第二部分將介紹建議的最佳實(shí)踐,以確保在實(shí)現(xiàn)流時(shí)應(yīng)用程序的代碼是安全的和優(yōu)化的。
我們假設(shè)你對(duì)Node.js中背壓、Buffer和EventEmitter的一般定義以及Stream的一些經(jīng)驗(yàn)有所了解。如果你還沒有閱讀這些文檔,那么首先查看API文檔并不是一個(gè)壞主意,因?yàn)樗兄谠陂喿x本指南時(shí)擴(kuò)展你的理解。
數(shù)據(jù)處理的問題在計(jì)算機(jī)系統(tǒng)中,數(shù)據(jù)通過管道、sockets和信號(hào)從一個(gè)進(jìn)程傳輸?shù)搅硪粋€(gè)進(jìn)程,在Node.js中,我們找到了一種名為Stream的類似機(jī)制。流很好!他們?yōu)镹ode.js做了很多事情,幾乎內(nèi)部代碼庫的每個(gè)部分都使用該模塊,作為開發(fā)人員,我們鼓勵(lì)你使用它們!
const readline = require("readline"); // process.stdin and process.stdout are both instances of Streams const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); rl.question("Why should you use streams? ", (answer) => { console.log(`Maybe it"s ${answer}, maybe it"s because they are awesome! :)`); rl.close(); });
通過比較Node.js的Stream實(shí)現(xiàn)的內(nèi)部系統(tǒng)工具,可以證明為什么通過流實(shí)現(xiàn)背壓機(jī)制是一個(gè)很好的優(yōu)化的一個(gè)很好的例子。
在一種情況下,我們將使用一個(gè)大文件(約?9gb)并使用熟悉的zip(1)工具對(duì)其進(jìn)行壓縮。
$ zip The.Matrix.1080p.mkv
雖然這需要幾分鐘才能完成,但在另一個(gè)shell中我們可以運(yùn)行一個(gè)腳本,該腳本采用Node.js的模塊zlib,它包含另一個(gè)壓縮工具gzip(1)。
const gzip = require("zlib").createGzip(); const fs = require("fs"); const inp = fs.createReadStream("The.Matrix.1080p.mkv"); const out = fs.createWriteStream("The.Matrix.1080p.mkv.gz"); inp.pipe(gzip).pipe(out);
要測(cè)試結(jié)果,請(qǐng)嘗試打開每個(gè)壓縮文件,zip(1)工具壓縮的文件將通知你文件已損壞,而Stream完成的壓縮將無錯(cuò)誤地解壓縮。
注意:在此示例中,我們使用.pipe()將數(shù)據(jù)源從一端獲取到另一端,但是,請(qǐng)注意沒有附加正確的錯(cuò)誤處理程序。如果無法正確接收數(shù)據(jù)塊,Readable源或gzip流將不會(huì)被銷毀,pump是一個(gè)實(shí)用工具,如果其中一個(gè)流失敗或關(guān)閉,它將正確地銷毀管道中的所有流,并且在這種情況下是必須的!
只有Nodejs 8.x或更早版本才需要pump,對(duì)于Node 10.x或更高版本,引入pipeline來替換pump。這是一個(gè)模塊方法,用于在流傳輸之間轉(zhuǎn)發(fā)錯(cuò)誤和正確清理,并在管道完成時(shí)提供回調(diào)。
以下是使用管道的示例:
const { pipeline } = require("stream"); const fs = require("fs"); const zlib = require("zlib"); // Use the pipeline API to easily pipe a series of streams // together and get notified when the pipeline is fully done. // A pipeline to gzip a potentially huge video file efficiently: pipeline( fs.createReadStream("The.Matrix.1080p.mkv"), zlib.createGzip(), fs.createWriteStream("The.Matrix.1080p.mkv.gz"), (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } );
你還可以在管道上調(diào)用promisify以將其與async/await一起使用:
const stream = require("stream"); const fs = require("fs"); const zlib = require("zlib"); const pipeline = util.promisify(stream.pipeline); async function run() { try { await pipeline( fs.createReadStream("The.Matrix.1080p.mkv"), zlib.createGzip(), fs.createWriteStream("The.Matrix.1080p.mkv.gz"), ); console.log("Pipeline succeeded"); } catch (err) { console.error("Pipeline failed", err); } }太多的數(shù)據(jù),太快
有些情況下,Readable流可能會(huì)過快地為Writable提供數(shù)據(jù) — 遠(yuǎn)遠(yuǎn)超過消費(fèi)者可以處理的數(shù)據(jù)!
當(dāng)發(fā)生這種情況時(shí),消費(fèi)者將開始排隊(duì)所有數(shù)據(jù)塊以供以后消費(fèi),寫入隊(duì)列將變得越來越長(zhǎng),因此在整個(gè)過程完成之前,必須將更多數(shù)據(jù)保存在內(nèi)存中。
寫入磁盤比從磁盤讀取要慢很多,因此,當(dāng)我們嘗試壓縮文件并將其寫入我們的硬盤時(shí),將發(fā)生背壓,因?yàn)閷懭氪疟P將無法跟上讀取的速度。
// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!" // Data will begin to build up on the read-side of the data buffer as // `write` tries to keep up with the incoming data flow. inp.pipe(gzip).pipe(outputFile);
這就是背壓機(jī)制很重要的原因,如果沒有背壓系統(tǒng),該進(jìn)程會(huì)耗盡系統(tǒng)的內(nèi)存,有效地減緩了其他進(jìn)程,并獨(dú)占你系統(tǒng)的大部分直到完成。
這導(dǎo)致了一些事情:
減緩所有其他當(dāng)前進(jìn)程。
一個(gè)非常超負(fù)荷的垃圾收集器。
內(nèi)存耗盡。
在下面的示例中,我們將取出.write()函數(shù)的返回值并將其更改為true,這有效地禁用了Node.js核心中的背壓支持,在任何對(duì)"modified"二進(jìn)制文件的引用中,我們正在談?wù)撛跊]有return ret;行的情況下運(yùn)行node二進(jìn)制,而改為return true;。
垃圾收集器上的過度負(fù)荷我們來看看快速基準(zhǔn)測(cè)試,使用上面的相同示例,我們進(jìn)行幾次試驗(yàn),以獲得兩個(gè)二進(jìn)制的中位時(shí)間。
trial (#) | `node` binary (ms) | modified `node` binary (ms) ================================================================= 1 | 56924 | 55011 2 | 52686 | 55869 3 | 59479 | 54043 4 | 54473 | 55229 5 | 52933 | 59723 ================================================================= average time: | 55299 | 55975
兩者都需要大約一分鐘來運(yùn)行,因此根本沒有太大差別,但讓我們仔細(xì)看看以確認(rèn)我們的懷疑是否正確,我們使用Linux工具dtrace來評(píng)估V8垃圾收集器發(fā)生了什么。
GC(垃圾收集器)測(cè)量時(shí)間表示垃圾收集器完成單次掃描的完整周期的間隔:
approx. time (ms) | GC (ms) | modified GC (ms) ================================================= 0 | 0 | 0 1 | 0 | 0 40 | 0 | 2 170 | 3 | 1 300 | 3 | 1 * * * * * * * * * 39000 | 6 | 26 42000 | 6 | 21 47000 | 5 | 32 50000 | 8 | 28 54000 | 6 | 35
雖然這兩個(gè)過程開始時(shí)相同,但似乎以相同的速率運(yùn)行GC,很明顯,在適當(dāng)工作的背壓系統(tǒng)幾秒鐘后,它將GC負(fù)載分布在4-8毫秒的一致間隔內(nèi),直到數(shù)據(jù)傳輸結(jié)束。
但是,當(dāng)背壓系統(tǒng)不到位時(shí),V8垃圾收集開始拖延,正常二進(jìn)制文件在一分鐘內(nèi)調(diào)用GC約75次,然而,修改后的二進(jìn)制文件僅觸發(fā)36次。
這是由于內(nèi)存使用量增加而累積的緩慢而漸進(jìn)的債務(wù),隨著數(shù)據(jù)傳輸,在沒有背壓系統(tǒng)的情況下,每個(gè)塊傳輸使用更多內(nèi)存。
分配的內(nèi)存越多,GC在一次掃描中需要處理的內(nèi)存就越多,掃描越大,GC就越需要決定可以釋放什么,并且在更大的內(nèi)存空間中掃描分離的指針將消耗更多的計(jì)算能力。
內(nèi)存耗盡為確定每個(gè)二進(jìn)制的內(nèi)存消耗,我們使用/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js多帶帶為每個(gè)進(jìn)程計(jì)時(shí)。
這是正常二進(jìn)制的輸出:
Respecting the return value of .write() ============================================= real 58.88 user 56.79 sys 8.79 87810048 maximum resident set size 0 average shared memory size 0 average unshared data size 0 average unshared stack size 19427 page reclaims 3134 page faults 0 swaps 5 block input operations 194 block output operations 0 messages sent 0 messages received 1 signals received 12 voluntary context switches 666037 involuntary context switches
虛擬內(nèi)存占用的最大字節(jié)大小約為87.81mb。
現(xiàn)在更改.write()函數(shù)的返回值,我們得到:
Without respecting the return value of .write(): ================================================== real 54.48 user 53.15 sys 7.43 1524965376 maximum resident set size 0 average shared memory size 0 average unshared data size 0 average unshared stack size 373617 page reclaims 3139 page faults 0 swaps 18 block input operations 199 block output operations 0 messages sent 0 messages received 1 signals received 25 voluntary context switches 629566 involuntary context switches
虛擬內(nèi)存占用的最大字節(jié)大小約為1.52gb。
如果沒有流來委托背壓,則分配的內(nèi)存空間要大一個(gè)數(shù)量級(jí) — 同一進(jìn)程之間的巨大差異!
這個(gè)實(shí)驗(yàn)展示了Node.js的反壓機(jī)制是如何優(yōu)化和節(jié)省成本的,現(xiàn)在,讓我們分析一下它是如何工作的!
背壓如何解決這些問題?將數(shù)據(jù)從一個(gè)進(jìn)程傳輸?shù)搅硪粋€(gè)進(jìn)程有不同的函數(shù),在Node.js中,有一個(gè)名為.pipe()的內(nèi)部?jī)?nèi)置函數(shù),還有其他包也可以使用!但最終,在這個(gè)過程的基本層面,我們有兩個(gè)獨(dú)立的組件:數(shù)據(jù)來源和消費(fèi)者。
當(dāng)從源調(diào)用.pipe()時(shí),它向消費(fèi)者發(fā)出信號(hào),告知有數(shù)據(jù)要傳輸,管道函數(shù)有助于為事件觸發(fā)器設(shè)置適當(dāng)?shù)谋硥洪]合。
在Node.js中,源是Readable流,而消費(fèi)者是Writable流(這些都可以與Duplex或Transform流互換,但這超出了本指南的范圍)。
觸發(fā)背壓的時(shí)刻可以精確地縮小到Writable的.write()函數(shù)的返回值,當(dāng)然,該返回值由幾個(gè)條件決定。
在數(shù)據(jù)緩沖區(qū)已超過highWaterMark或?qū)懭腙?duì)列當(dāng)前正忙的任何情況下,.write()將返回false。
當(dāng)返回false值時(shí),背壓系統(tǒng)啟動(dòng),它會(huì)暫停傳入的Readable流發(fā)送任何數(shù)據(jù),并等待消費(fèi)者再次準(zhǔn)備就緒,清空數(shù)據(jù)緩沖區(qū)后,將發(fā)出.drain()事件并恢復(fù)傳入的數(shù)據(jù)流。
隊(duì)列完成后,背壓將允許再次發(fā)送數(shù)據(jù),正在使用的內(nèi)存空間將自行釋放并為下一批數(shù)據(jù)做好準(zhǔn)備。
這有效地允許在任何給定時(shí)間為.pipe()函數(shù)使用固定數(shù)量的內(nèi)存,沒有內(nèi)存泄漏,沒有無限緩沖,垃圾收集器只需要處理內(nèi)存中的一個(gè)區(qū)域!
那么,如果背壓如此重要,為什么你(可能)沒有聽說過它?答案很簡(jiǎn)單:Node.js會(huì)自動(dòng)為你完成所有這些工作。
那太好了!但是當(dāng)我們?cè)噲D了解如何實(shí)現(xiàn)我們自己的自定義流時(shí),也不是那么好。
注意:在大多數(shù)機(jī)器中,有一個(gè)字節(jié)大小可以確定緩沖區(qū)何時(shí)已滿(在不同的機(jī)器上會(huì)有所不同),Node.js允許你設(shè)置自己的自定義highWaterMark,但通常,默認(rèn)設(shè)置為16kb(16384,或objectMode流為16),在你可能希望提高該值的情況下,可以嘗試,但是要小心!
.pipe()的生命周期為了更好地理解背壓,下面是一個(gè)關(guān)于Readable流的生命周期的流程圖,該流被管道傳輸?shù)?b>Writable流中:
+===================+ x--> Piping functions +--> src.pipe(dest) | x are set up during |===================| x the .pipe method. | Event callbacks | +===============+ x |-------------------| | Your Data | x They exist outside | .on("close", cb) | +=======+=======+ x the data flow, but | .on("data", cb) | | x importantly attach | .on("drain", cb) | | x events, and their | .on("unpipe", cb) | +---------v---------+ x respective callbacks. | .on("error", cb) | | Readable Stream +----+ | .on("finish", cb) | +-^-------^-------^-+ | | .on("end", cb) | ^ | ^ | +-------------------+ | | | | | ^ | | ^ ^ ^ | +-------------------+ +=================+ ^ | ^ +----> Writable Stream +---------> .write(chunk) | | | | +-------------------+ +=======+=========+ | | | | | ^ | +------------------v---------+ ^ | +-> if (!chunk) | Is this chunk too big? | ^ | | emit .end(); | Is the queue busy? | | | +-> else +-------+----------------+---+ | ^ | emit .write(); | | | ^ ^ +--v---+ +---v---+ | | ^-----------------------------------< No | | Yes | ^ | +------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---------------^-----------------------+ return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^------------^-----------------------< Buffering | | | |============| | +> emit .drain(); | ^Buffer^ | | +> emit .resume(); +------------+ | | ^Buffer^ | | +------------+ add chunk to queue | | <---^---------------------< +============+
注意:如果要設(shè)置管道以將一些流鏈接在一起來操作數(shù)據(jù),則很可能會(huì)實(shí)現(xiàn)Transform流。
在這種情況下,你的Readable流的輸出將輸入到Transform中,并將管道到Writable中。
Readable.pipe(Transformable).pipe(Writable);
背壓將自動(dòng)應(yīng)用,但請(qǐng)注意,Transform流的輸入和輸出highWaterMark都可能被操縱并將影響背壓系統(tǒng)。
背壓指南從Node.js v0.10開始,Stream類提供了通過使用這些相應(yīng)函數(shù)的下劃線版本來修改.read()或.write()的行為的功能(._read()和._write())。
對(duì)于實(shí)現(xiàn)Readable流和Writable流,有文檔化的指南,我們假設(shè)你已閱讀過這些內(nèi)容,下一節(jié)將更深入一些。
實(shí)現(xiàn)自定義流時(shí)要遵守的規(guī)則流的黃金法則始終是尊重背壓,最佳實(shí)踐的構(gòu)成是非矛盾的實(shí)踐,只要你小心避免與內(nèi)部背壓支持相沖突的行為,你就可以確定你遵循良好做法。
一般來說:
如果你沒有被要求,永遠(yuǎn)不要.push()。
永遠(yuǎn)不要在返回false后調(diào)用.write(),而是等待"drain"。
流在不同的Node.js版本和你使用的庫之間有變化,小心并測(cè)試一下。
注意:關(guān)于第3點(diǎn),構(gòu)建瀏覽器流的非常有用的包是readable-stream,Rodd Vagg撰寫了一篇很棒的博客文章,描述了這個(gè)庫的實(shí)用性,簡(jiǎn)而言之,它為Readable流提供了一種自動(dòng)優(yōu)雅降級(jí),并支持舊版本的瀏覽器和Node.js。
Readable流的特定規(guī)則到目前為止,我們已經(jīng)了解了.write()如何影響背壓,并將重點(diǎn)放在Writable流上,由于Node.js的功能,數(shù)據(jù)在技術(shù)上從Readable流向下游Writable。但是,正如我們可以在數(shù)據(jù)、物質(zhì)或能量的任何傳輸中觀察到的那樣,源與目標(biāo)一樣重要,Readable流對(duì)于如何處理背壓至關(guān)重要。
這兩個(gè)過程都相互依賴,有效地進(jìn)行通信,如果Readable忽略Writable流要求它停止發(fā)送數(shù)據(jù)的時(shí)候,那么.write()的返回值不正確就會(huì)有問題。
因此,關(guān)于.write()返回,我們還必須尊重._read()方法中使用的.push()的返回值,如果.push()返回false值,則流將停止從源讀取,否則,它將繼續(xù)而不會(huì)停頓。
以下是使用.push()的不好做法示例:
// This is problematic as it completely ignores return value from push // which may be a signal for backpressure from the destination stream! class MyReadable extends Readable { _read(size) { let chunk; while (null !== (chunk = getNextChunk())) { this.push(chunk); } } }
此外,在自定義流之外,存在忽略背壓的陷阱,在這個(gè)良好的實(shí)踐的反例中,應(yīng)用程序的代碼會(huì)在數(shù)據(jù)可用時(shí)強(qiáng)制通過(由.data事件發(fā)出信號(hào)):
// This ignores the backpressure mechanisms Node.js has set in place, // and unconditionally pushes through data, regardless if the // destination stream is ready for it or not. readable.on("data", (data) => writable.write(data) );Writable流的特定規(guī)則
回想一下.write()可能會(huì)根據(jù)某些條件返回true或false,幸運(yùn)的是,在構(gòu)建我們自己的Writable流時(shí),流狀態(tài)機(jī)將處理我們的回調(diào)并確定何時(shí)處理背壓并為我們優(yōu)化數(shù)據(jù)流。
但是,當(dāng)我們想直接使用Writable時(shí),我們必須尊重.write()返回值并密切注意這些條件:
如果寫隊(duì)列忙,.write()將返回false。
如果數(shù)據(jù)塊太大,.write()將返回false(該值由變量highWaterMark指示)。
// This writable is invalid because of the async nature of JavaScript callbacks. // Without a return statement for each callback prior to the last, // there is a great chance multiple callbacks will be called. class MyWritable extends Writable { _write(chunk, encoding, callback) { if (chunk.toString().indexOf("a") >= 0) callback(); else if (chunk.toString().indexOf("b") >= 0) callback(); callback(); } } // The proper way to write this would be: if (chunk.contains("a")) return callback(); else if (chunk.contains("b")) return callback(); callback();
在實(shí)現(xiàn)._writev()時(shí)還需要注意一些事項(xiàng),該函數(shù)與.cork()結(jié)合使用,但寫入時(shí)有一個(gè)常見錯(cuò)誤:
// Using .uncork() twice here makes two calls on the C++ layer, rendering the // cork/uncork technique useless. ws.cork(); ws.write("hello "); ws.write("world "); ws.uncork(); ws.cork(); ws.write("from "); ws.write("Matteo"); ws.uncork(); // The correct way to write this is to utilize process.nextTick(), which fires // on the next event loop. ws.cork(); ws.write("hello "); ws.write("world "); process.nextTick(doUncork, ws); ws.cork(); ws.write("from "); ws.write("Matteo"); process.nextTick(doUncork, ws); // as a global function function doUncork(stream) { stream.uncork(); }
.cork()可以被調(diào)用多次,我們只需要小心調(diào)用.uncork()相同的次數(shù),使其再次流動(dòng)。
結(jié)論Streams是Node.js中經(jīng)常使用的模塊,它們對(duì)于內(nèi)部結(jié)構(gòu)非常重要,對(duì)于開發(fā)人員來說,它們可以跨Node.js模塊生態(tài)系統(tǒng)進(jìn)行擴(kuò)展和連接。
希望你現(xiàn)在能夠進(jìn)行故障排除,安全地編寫你自己的Writable和Readable流,并考慮背壓,并與同事和朋友分享你的知識(shí)。
在使用Node.js構(gòu)建應(yīng)用程序時(shí),請(qǐng)務(wù)必閱讀有關(guān)其他API函數(shù)的Stream的更多信息,以幫助改進(jìn)和釋放你的流功能。
上一篇:使用不同的文件系統(tǒng) 下一篇:域模塊剖析文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/100377.html
Node.js 指南 Node.js?是基于Chrome的V8 JavaScript引擎構(gòu)建的JavaScript運(yùn)行時(shí)。 常規(guī) 關(guān)于Node.js 入門指南 輕松分析Node.js應(yīng)用程序 Docker化Node.js Web應(yīng)用程序 遷移到安全的Buffer構(gòu)造函數(shù) Node.js核心概念 阻塞與非阻塞概述 Node.js事件循環(huán)、定時(shí)器和process.nextTick() 不要阻塞事...
摘要:避免使用最低公分母方法你可能想讓你的程序像最低公分母文件系統(tǒng)一樣,通過將所有文件名規(guī)范化為大寫,將所有文件名規(guī)范化為格式,并將所有文件時(shí)間戳標(biāo)準(zhǔn)化為秒分辨率,這是最小公分母的方法。 使用不同的文件系統(tǒng) Node公開了文件系統(tǒng)的許多功能,但并非所有文件系統(tǒng)都相似,以下是建議的最佳實(shí)踐,以便在使用不同的文件系統(tǒng)時(shí)保持代碼簡(jiǎn)單和安全。 文件系統(tǒng)行為 在使用文件系統(tǒng)之前,你需要知道它的行為方式...
摘要:快速檢查可能告訴我們,簡(jiǎn)單地從的域處理程序拋出將允許然后捕獲異常并執(zhí)行其自己的錯(cuò)誤處理程序,雖然情況并非如此,檢查后,你會(huì)看到堆棧只包含。 域模塊剖析 可用性問題 隱式行為 開發(fā)人員可以創(chuàng)建新域,然后只需運(yùn)行domain.enter(),然后,它充當(dāng)將來拋出者無法觀察到的任何異常的萬能捕捉器,允許模塊作者攔截不同模塊中不相關(guān)代碼的異常,防止代碼的發(fā)起者知道自己的異常。 以下是一個(gè)間接鏈...
摘要:相對(duì)于最大的更新就是把對(duì)背壓?jiǎn)栴}的處理邏輯從中抽取出來產(chǎn)生了新的可觀察對(duì)象。由于基于發(fā)射的數(shù)據(jù)流,以及對(duì)數(shù)據(jù)加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運(yùn)行效率要比慢得多。 背壓(backpressure)當(dāng)上下游在不同的線程中,通過Observable發(fā)射,處理,響應(yīng)數(shù)據(jù)流時(shí),如果上游發(fā)射數(shù)據(jù)的速度快于下游接收處理數(shù)據(jù)的速度,這樣對(duì)于那些沒來得及處理的數(shù)據(jù)就會(huì)造成積壓,這...
閱讀 3125·2021-11-18 10:02
閱讀 3413·2021-11-02 14:48
閱讀 3456·2019-08-30 13:52
閱讀 621·2019-08-29 17:10
閱讀 2150·2019-08-29 12:53
閱讀 1474·2019-08-29 12:53
閱讀 1090·2019-08-29 12:25
閱讀 2227·2019-08-29 12:17