摘要:使用線程池可以節(jié)省那種系統(tǒng)開(kāi)銷,同時(shí)允許實(shí)現(xiàn)者利用并行硬件的優(yōu)勢(shì)。但是對(duì)于連接生存期比較長(zhǎng)的協(xié)議來(lái)說(shuō),線程池的大小仍然限制了系統(tǒng)同時(shí)可以處理的客戶端數(shù)量。
NIO主要包含兩部分,Selector和Channel、Buffer。
為什么需要需要NIO基本的Java套接字對(duì)于小規(guī)模系統(tǒng)可以很好地運(yùn)行,但當(dāng)涉及同時(shí)處理上千個(gè)客戶端的服務(wù)器時(shí),可能就會(huì)產(chǎn)生一些問(wèn)題。由于創(chuàng)建、維護(hù)、切換線程需要的系統(tǒng)開(kāi)銷,一客戶一線程的方式在系統(tǒng)擴(kuò)展性方面受到了限制。使用線程池可以節(jié)省那種系統(tǒng)開(kāi)銷,同時(shí)允許實(shí)現(xiàn)者利用并行硬件的優(yōu)勢(shì)。
但是對(duì)于連接生存期比較長(zhǎng)的協(xié)議來(lái)說(shuō),線程池的大小仍然限制了系統(tǒng)同時(shí)可以處理的客戶端數(shù)量。
另外對(duì)于服務(wù)器需要由不同客戶端同時(shí)訪問(wèn)和修改的信息時(shí),對(duì)于多線程就得進(jìn)行同步,這會(huì)變得更加復(fù)雜,即使用同步機(jī)制將增加更多的系統(tǒng)調(diào)度和上下文切換開(kāi)銷,而程序員對(duì)這些開(kāi)銷又無(wú)法控制。
由于多線程的同步的復(fù)雜性,一些程序員寧愿繼續(xù)使用單線程方法,這類服務(wù)器只用一個(gè)線程來(lái)處理所有客戶端——不是順序處理,而是一次全部處理。這種服務(wù)器不能為任何客戶端提供I/O操作的阻塞等待,而必須排他地使用非阻塞方式(nonblocking)I/O。
前面的while true,不斷地輪詢(poll)accept方法,這種忙等(busy waiting)方法會(huì)引入系統(tǒng)開(kāi)銷,因?yàn)槌绦蛐枰磸?fù)循環(huán)地連接I/O源,卻又發(fā)現(xiàn)什么都不用做。
我們需要一種方法來(lái)一次輪詢一組客戶端,以查找那個(gè)客戶端需要服務(wù),這正是NIO要介紹的Selector和Channel的抽象關(guān)鍵點(diǎn)。
一個(gè)Channel實(shí)例代表了一個(gè)可輪詢(pollable)的I/O目標(biāo),如套接字(或一個(gè)文件、設(shè)備等)。Channel能夠注冊(cè)一個(gè)Selector類的實(shí)例。
Selector的select方法允許你詢問(wèn)在一組信道中,哪一個(gè)當(dāng)前需要服務(wù)(被接受、讀、寫(xiě))。
Stream的抽象,好處是隱藏了底層緩沖區(qū)的有限性,提供了一個(gè)能夠容納任意長(zhǎng)度數(shù)據(jù)的容器的假象。要實(shí)現(xiàn)這樣一個(gè)假象,要么會(huì)產(chǎn)生大量的內(nèi)存開(kāi)銷,要么會(huì)引入大量的上下文切換,不好控制。
使用Buffer抽象的原因是:Buffer抽象代表了一個(gè)有限容量(finite-capacity)的數(shù)據(jù)容器——其本質(zhì)是一個(gè)數(shù)組,由指針指示了在哪存放數(shù)據(jù)和從哪讀取數(shù)據(jù)。使用Buffer的好處是:
1)與讀寫(xiě)緩沖區(qū)數(shù)據(jù)相關(guān)聯(lián)的系統(tǒng)開(kāi)銷都暴露給了程序員。例如,如果想要向緩沖區(qū)存入數(shù)據(jù),但是又沒(méi)有足夠的空間時(shí),就必須采取一些措施來(lái)獲得空間(即移出一些數(shù)據(jù),或移開(kāi)已經(jīng)在那個(gè)位置的數(shù)據(jù)來(lái)獲得空間,或者創(chuàng)建一個(gè)新的新的實(shí)例)。這意味著需要額外的工作,但是你可以控制它什么時(shí)候發(fā)生,如何發(fā)生,以及是否發(fā)生。
2)一些對(duì)Java對(duì)象的特殊Buffer映射操作能夠直接操作底層平臺(tái)的資源(例如操作系統(tǒng)的緩沖區(qū)),這些操作節(jié)省了在不同地址空間中復(fù)制數(shù)據(jù)的開(kāi)銷。
綜上,Channel實(shí)例代表了一個(gè)與設(shè)備的連接,通過(guò)它可以進(jìn)行輸入輸出操作。信道(channel)和套接字(socket)的不同之處在于:channel通常需要調(diào)用靜態(tài)工廠方法來(lái)獲取實(shí)例。channel使用的不是流,而是使用緩沖區(qū)來(lái)發(fā)送或讀取數(shù)據(jù)。
Buffer有固定的、有限的容量,并由內(nèi)部狀態(tài)記錄了有多少數(shù)據(jù)放入或取出,就像是一個(gè)有限容量的隊(duì)列一樣。
SelectorNIO的強(qiáng)大功能部分來(lái)自于channel的非阻塞特性。accept可能因?yàn)榈却粋€(gè)客戶端連接而阻塞,read可能因?yàn)闆](méi)有數(shù)據(jù)可讀而阻塞,直到連接的另一端傳來(lái)新數(shù)據(jù)。
總的來(lái)說(shuō),創(chuàng)建/接收連接或讀寫(xiě)數(shù)據(jù)等I/O調(diào)用,都可能無(wú)限期地阻塞等待,直到底層的網(wǎng)絡(luò)實(shí)現(xiàn)發(fā)生了什么。慢速的、有損耗的網(wǎng)絡(luò),或僅僅是簡(jiǎn)單的網(wǎng)絡(luò)故障都可能導(dǎo)致任意時(shí)間的延遲。
而NIO則立即返回:
public class TCPEchoClientNonblocking { public static void main(String args[]) throws Exception { if ((args.length < 2) || (args.length > 3)) // Test for correct # of args throw new IllegalArgumentException("Parameter(s):[ ]"); String server = args[0]; // Server name or IP address // Convert input String to bytes using the default charset byte[] argument = args[1].getBytes(); int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7; // Create channel and set to nonblocking SocketChannel clntChan = SocketChannel.open(); clntChan.configureBlocking(false); // Initiate connection to server and repeatedly poll until complete if (!clntChan.connect(new InetSocketAddress(server, servPort))) { while (!clntChan.finishConnect()) { System.out.print("."); // Do something else } } ByteBuffer writeBuf = ByteBuffer.wrap(argument); ByteBuffer readBuf = ByteBuffer.allocate(argument.length); int totalBytesRcvd = 0; // Total bytes received so far int bytesRcvd; // Bytes received in last read while (totalBytesRcvd < argument.length) { if (writeBuf.hasRemaining()) { clntChan.write(writeBuf); } if ((bytesRcvd = clntChan.read(readBuf)) == -1) { throw new SocketException("Connection closed prematurely"); } totalBytesRcvd += bytesRcvd; System.out.print("."); // Do something else } System.out.println("Received: " + // convert to String per default charset new String(readBuf.array(), 0, totalBytesRcvd)); clntChan.close(); } }
上面的輪詢僅僅是演示用。
需要使用Selector類來(lái)避免忙等的輪詢??紤]一個(gè)即時(shí)的消息服務(wù)器,可能有上千個(gè)客戶端同時(shí)連接到了服務(wù)器,但任何時(shí)刻都只有非常少量的消息需要讀取和分發(fā)。這就需要一種方法阻塞等待,直到至少有一個(gè)信道可以進(jìn)行I/O操作,并指出是哪個(gè)信道。NIO的選擇器就實(shí)現(xiàn)了這樣的功能。一個(gè)Selector實(shí)例可以同時(shí)檢查一組信道的I/O狀態(tài)。用專業(yè)術(shù)語(yǔ)來(lái)說(shuō),選擇器就是一個(gè)多路開(kāi)關(guān)選擇器,因?yàn)橐粋€(gè)選擇器能夠管理多個(gè)信道上的I/O操作。
要使用選擇器,需要?jiǎng)?chuàng)建一個(gè)Selector實(shí)例并將其注冊(cè)到想要監(jiān)控的信道上(注意,這要通過(guò)channel的方法實(shí)現(xiàn),而不是使用selector的方法)。最后,調(diào)用選擇器的select方法,該方法會(huì)阻塞等待,直到還有一個(gè)或更多的信道準(zhǔn)備好了I/O操作或等待超時(shí)。select方法返回可進(jìn)行I/O操作的信道數(shù)量。
public class TCPServerSelector { private static final int BUFSIZE = 256; // Buffer size (bytes) private static final int TIMEOUT = 3000; // Wait timeout (milliseconds) public static void main(String[] args) throws IOException { if (args.length < 1) { // Test for correct # of args throw new IllegalArgumentException("Parameter(s):..."); } // Create a selector to multiplex listening sockets and connections Selector selector = Selector.open(); // Create listening socket channel for each port and register selector for (String arg : args) { ServerSocketChannel listnChannel = ServerSocketChannel.open(); listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg))); listnChannel.configureBlocking(false); // must be nonblocking to register // Register selector with channel. The returned key is ignored listnChannel.register(selector, SelectionKey.OP_ACCEPT); } // Create a handler that will implement the protocol TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE); while (true) { // Run forever, processing available I/O operations // Wait for some channel to be ready (or timeout) if (selector.select(TIMEOUT) == 0) { // returns # of ready chans System.out.print("."); continue; } // Get iterator on set of keys with I/O to process Iterator keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); // Key is bit mask // Server socket channel has pending connection requests? if (key.isAcceptable()) { protocol.handleAccept(key); } // Client socket channel has pending data? if (key.isReadable()) { protocol.handleRead(key); } // Client socket channel is available for writing and // key is valid (i.e., channel not closed)? if (key.isValid() && key.isWritable()) { protocol.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
由于select方法只是向selector所關(guān)聯(lián)的鍵集合中添加元素,因此,如果不移除每個(gè)處理過(guò)的鍵,它就會(huì)在下次調(diào)用select方法時(shí)仍然保留在集合中,而且可能會(huì)有無(wú)用的操作來(lái)調(diào)用它。
具體的處理方法
public class EchoSelectorProtocol implements TCPProtocol { private int bufSize; // Size of I/O buffer public EchoSelectorProtocol(int bufSize) { this.bufSize = bufSize; } public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); // Must be nonblocking to register // Register the selector with new channel for read and attach byte buffer clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer .allocate(bufSize)); } public void handleRead(SelectionKey key) throws IOException { // Client socket channel has pending data SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { // Indicate via key that reading/writing are both of interest now. key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { /* * Channel is available for writing, and key is valid (i.e., client channel * not closed). */ // Retrieve data read earlier ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); // Prepare buffer for writing SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { // Buffer completely written? // Nothing left, so no longer interested in writes key.interestOps(SelectionKey.OP_READ); } buf.compact(); // Make room for more data to be read in } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/65929.html
摘要:學(xué)習(xí)和掌握技術(shù)已經(jīng)不是一個(gè)攻城獅的加分技能,而是一個(gè)必備技能。是雙向的,不僅可以讀取數(shù)據(jù)還能保存數(shù)據(jù),程序不能直接讀寫(xiě)通道,只與緩沖區(qū)交互為了讓大家不被高并發(fā)與大量連接處理問(wèn)題所困擾,動(dòng)力節(jié)點(diǎn)推出了高效處理模型應(yīng)用教程。 大家肯定了解Java IO, 但是對(duì)于NIO一般是陌生的,而現(xiàn)在使用到NIO的場(chǎng)景越來(lái)越多,很多技術(shù)框...
摘要:內(nèi)存溢出的情況就是從類加載器加載的時(shí)候開(kāi)始出現(xiàn)的,內(nèi)存溢出分為兩大類和。以下舉出個(gè)內(nèi)存溢出的情況,并通過(guò)實(shí)例代碼的方式講解了是如何出現(xiàn)內(nèi)存溢出的。內(nèi)存溢出問(wèn)題描述元空間的溢出,系統(tǒng)會(huì)拋出。這樣就會(huì)造成棧的內(nèi)存溢出。 導(dǎo)言: 對(duì)于java程序員來(lái)說(shuō),在虛擬機(jī)自動(dòng)內(nèi)存管理機(jī)制的幫助下,不需要自己實(shí)現(xiàn)釋放內(nèi)存,不容易出現(xiàn)內(nèi)存泄漏和內(nèi)存溢出的問(wèn)題,由虛擬機(jī)管理內(nèi)存這一切看起來(lái)非常美好,但是一旦...
摘要:基礎(chǔ)知識(shí)基礎(chǔ)語(yǔ)法基礎(chǔ)知識(shí)編程第一步基礎(chǔ)知識(shí)基本數(shù)據(jù)類型基礎(chǔ)知識(shí)解釋器基礎(chǔ)知識(shí)注釋基礎(chǔ)知識(shí)運(yùn)算符基礎(chǔ)知識(shí)數(shù)字基礎(chǔ)知識(shí)字符串基礎(chǔ)知識(shí)列表基礎(chǔ)知識(shí)元組基礎(chǔ)知識(shí)字典基礎(chǔ)知識(shí)條件控制基礎(chǔ)知識(shí)循環(huán)基礎(chǔ)知識(shí)迭代器與生成器基礎(chǔ)知識(shí)函數(shù)基礎(chǔ)知識(shí)數(shù)據(jù)結(jié)構(gòu)基礎(chǔ)知 Python3基礎(chǔ)知識(shí) | 基礎(chǔ)語(yǔ)法?Python3基礎(chǔ)知識(shí) | 編程第一步?Python3基礎(chǔ)知識(shí) | 基本數(shù)據(jù)類型Python3基礎(chǔ)知識(shí) | ...
閱讀 2560·2021-10-09 09:59
閱讀 2344·2021-09-23 11:30
閱讀 2713·2019-08-30 15:56
閱讀 1224·2019-08-30 14:00
閱讀 3028·2019-08-29 12:37
閱讀 1349·2019-08-28 18:16
閱讀 1730·2019-08-27 10:56
閱讀 1101·2019-08-26 17:23