摘要:單線程模式流程服務(wù)器端的是一個線程對象,該線程會啟動事件循環(huán),并使用選擇器來實現(xiàn)的多路復(fù)用。線程池分配一個線程給這個,即,將關(guān)注的事件以及對應(yīng)的事件處理器注冊到線程中。多線程模式將接受客戶端的連接請求和與該客戶端的通信分在了兩個線程來完成。
Reactor模式
反應(yīng)堆模式:“反應(yīng)”器名字中”反應(yīng)“的由來:
“反應(yīng)”即“倒置”,“控制逆轉(zhuǎn)”,具體事件處理程序不調(diào)用反應(yīng)器,而向反應(yīng)器注冊一個事件處理器,表示自己對某些事件感興趣,有時間來了,具體事件處理程序通過事件處理器對某個指定的事件發(fā)生做出反應(yīng)。
單線程Reactor模式流程:①服務(wù)器端的Reactor是一個線程對象,該線程會啟動事件循環(huán),并使用Selector(選擇器)來實現(xiàn)IO的多路復(fù)用。channel注冊一個Acceptor事件處理器到Reactor中,Acceptor事件處理器所關(guān)注的事件是ACCEPT事件,這樣Reactor會監(jiān)聽客戶端向服務(wù)器端發(fā)起的連接請求事件(ACCEPT事件)。
②客戶端向服務(wù)器端發(fā)起一個連接請求,Reactor監(jiān)聽到了該ACCEPT事件的發(fā)生并將該ACCEPT事件派發(fā)給相應(yīng)的Acceptor處理器來進行處理。Acceptor處理器通過accept()方法得到與這個客戶端對應(yīng)的連接(SocketChannel),然后將該連接所關(guān)注的READ事件以及對應(yīng)的READ事件處理器注冊到Reactor中,這樣一來Reactor就會監(jiān)聽該連接的READ事件了。
③當(dāng)Reactor監(jiān)聽到有讀或者寫事件發(fā)生時,將相關(guān)的事件派發(fā)給對應(yīng)的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法讀取數(shù)據(jù),此時read()操作可以直接讀取到數(shù)據(jù),而不會堵塞與等待可讀的數(shù)據(jù)到來。
④每當(dāng)處理完所有就緒的感興趣的I/O事件后,Reactor線程會再次執(zhí)行select()阻塞等待新的事件就緒并將其分派給對應(yīng)處理器進行處理。
注意,Reactor的單線程模式的單線程主要是針對于I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一個線程上完成的。
基于單線程反應(yīng)器模式手寫一個NIO通信先簡單介紹NIO中幾個重要對象:
Selector
Selector的英文含義是“選擇器”,也可以稱為為“輪詢代理器”、“事件訂閱器”、“channel容器管理機”都行。
事件訂閱和Channel管理: 應(yīng)用程序?qū)⑾騍elector對象注冊需要它關(guān)注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣。Selector中也會維護一個“已經(jīng)注冊的Channel”的容器。
Channels
通道,被建立的一個應(yīng)用程序和操作系統(tǒng)交互事件、傳遞內(nèi)容的渠道(注意是連接到操作系統(tǒng))。那么既然是和操作系統(tǒng)進行內(nèi)容的傳遞,那么說明應(yīng)用程序可以通過通道讀取數(shù)據(jù),也可以通過通道向操作系統(tǒng)寫數(shù)據(jù)。
所有被Selector(選擇器)注冊的通道,只能是繼承了SelectableChannel類的子類。
ServerSocketChannel:應(yīng)用服務(wù)器程序的監(jiān)聽通道。只有通過這個通道,應(yīng)用程序才能向操作系統(tǒng)注冊支持“多路復(fù)用IO”的端口監(jiān)聽。同時支持UDP協(xié)議和TCP協(xié)議。
ScoketChannel:TCP Socket套接字的監(jiān)聽通道,一個Socket套接字對應(yīng)了一個客戶端IP:端口 到
服務(wù)器IP:端口的通信連接。
DatagramChannel:UDP 數(shù)據(jù)報文的監(jiān)聽通道。
通道中的數(shù)據(jù)總是要先讀到一個Buffer,或者總是要從一個Buffer中寫入。
服務(wù)端處理器:
/** * 類說明:nio通信服務(wù)端處理器 */ public class NioServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 構(gòu)造方法 * @param port 指定要監(jiān)聽的端口號 */ public NioServerHandle(int port) { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector,SelectionKey.OP_ACCEPT); started = true; System.out.println("服務(wù)器已啟動,端口號:"+port); } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { //循環(huán)遍歷selector while(started){ try{ //阻塞,只有當(dāng)至少一個注冊的事件發(fā)生的時候才會繼續(xù). selector.select(); Setkeys = selector.selectedKeys(); Iterator it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } //selector關(guān)閉后會自動釋放里面管理的資源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //處理新接入的請求消息 if(key.isAcceptable()){ //獲得關(guān)心當(dāng)前事件的channel ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); //通過ServerSocketChannel的accept創(chuàng)建SocketChannel實例 //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立 SocketChannel sc = ssc.accept(); System.out.println("======socket channel 建立連接" ); //設(shè)置為非阻塞的 sc.configureBlocking(false); //連接已經(jīng)完成了,可以開始關(guān)心讀事件了 sc.register(selector,SelectionKey.OP_READ); } //讀消息 if(key.isReadable()){ System.out.println("======socket channel 數(shù)據(jù)準(zhǔn)備完成," + "可以去讀==讀取======="); SocketChannel sc = (SocketChannel) key.channel(); //創(chuàng)建ByteBuffer,并開辟一個1M的緩沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的字節(jié)數(shù) int readBytes = sc.read(buffer); //讀取到字節(jié),對字節(jié)進行編解碼 if(readBytes>0){ //將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0, // 用于后續(xù)對緩沖區(qū)的讀取操作 buffer.flip(); //根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組 byte[] bytes = new byte[buffer.remaining()]; //將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中 buffer.get(bytes); String message = new String(bytes,"UTF-8"); System.out.println("服務(wù)器收到消息:" + message); //處理數(shù)據(jù) String result = response(message) ; //發(fā)送應(yīng)答消息 doWrite(sc,result); } //鏈路已經(jīng)關(guān)閉,釋放資源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //發(fā)送應(yīng)答消息 private void doWrite(SocketChannel channel,String response) throws IOException { //將消息編碼為字節(jié)數(shù)組 byte[] bytes = response.getBytes(); //根據(jù)數(shù)組容量創(chuàng)建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //將字節(jié)數(shù)組復(fù)制到緩沖區(qū) writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //發(fā)送緩沖區(qū)的字節(jié)數(shù)組 channel.write(writeBuffer); } } public class NioServer { private static NioServerHandle nioServerHandle; public static void start(){ if(nioServerHandle !=null) nioServerHandle.stop(); nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } public static void main(String[] args){ start(); } }
客戶端處理器:
public class NioClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { //創(chuàng)建選擇器 selector = Selector.open(); //打開通道 socketChannel = SocketChannel.open(); //如果為 true,則此通道將被置于阻塞模式; // 如果為 false,則此通道將被置于非阻塞模式 socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } //循環(huán)遍歷selector while(started){ try { //阻塞,只有當(dāng)至少一個注冊的事件發(fā)生的時候才會繼續(xù) selector.select(); //獲取當(dāng)前有哪些事件可以使用 Setkeys = selector.selectedKeys(); //轉(zhuǎn)換為迭代器 Iterator it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { e.printStackTrace(); if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } //selector關(guān)閉后會自動釋放里面管理的資源 if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } //具體的事件處理方法 private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //獲得關(guān)心當(dāng)前事件的channel SocketChannel sc = (SocketChannel)key.channel(); if(key.isConnectable()){//連接事件 if(sc.finishConnect()){} else{System.exit(1);} } //有數(shù)據(jù)可讀事件 if(key.isReadable()){ //創(chuàng)建ByteBuffer,并開辟一個1M的緩沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的字節(jié)數(shù) int readBytes = sc.read(buffer); //讀取到字節(jié),對字節(jié)進行編解碼 if(readBytes>0){ //將緩沖區(qū)當(dāng)前的limit設(shè)置為position,position=0, // 用于后續(xù)對緩沖區(qū)的讀取操作 buffer.flip(); //根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組 byte[] bytes = new byte[buffer.remaining()]; //將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("accept message:"+result); }else if(readBytes<0){ key.cancel(); sc.close(); } } } } //發(fā)送消息 private void doWrite(SocketChannel channel,String request) throws IOException { //將消息編碼為字節(jié)數(shù)組 byte[] bytes = request.getBytes(); //根據(jù)數(shù)組容量創(chuàng)建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //將字節(jié)數(shù)組復(fù)制到緩沖區(qū) writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //發(fā)送緩沖區(qū)的字節(jié)數(shù)組 channel.write(writeBuffer); } private void doConnect() throws IOException { /*如果此通道處于非阻塞模式, 則調(diào)用此方法將啟動非阻塞連接操作。 如果立即建立連接,就像本地連接可能發(fā)生的那樣,則此方法返回true。 否則,此方法返回false, 稍后必須通過調(diào)用finishConnect方法完成連接操作。*/ if(socketChannel.connect(new InetSocketAddress(host,port))){} else{ //連接還未完成,所以注冊連接就緒事件,向selector表示關(guān)注這個事件 socketChannel.register(selector,SelectionKey.OP_CONNECT); } } //寫數(shù)據(jù)對外暴露的API public void sendMsg(String msg) throws Exception{ socketChannel.register(selector,SelectionKey.OP_READ); doWrite(socketChannel,msg); } } public class NioClient { private static NioClientHandle nioClientHandle; public static void start(){ if(nioClientHandle !=null) nioClientHandle.stop(); nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT); new Thread(nioClientHandle,"Client").start(); } //向服務(wù)器發(fā)送消息 public static boolean sendMsg(String msg) throws Exception{ nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); System.out.println("請輸入請求信息:"); Scanner scanner = new Scanner(System.in); while(NioClient.sendMsg(scanner.next())); } }
服務(wù)端過程:
啟動服務(wù)端,完成一些初始化工作,ServerSocketChannel綁定端口并且注冊接受連接事件.
循環(huán)里selector.select()阻塞,只有當(dāng)至少一個注冊的事件發(fā)生的時候才會繼續(xù),循環(huán)里面處理發(fā)生的注冊事件
注冊事件發(fā)生時交給處理器,若為接受連接則accept取出socketChannel并完成連接,然后就是關(guān)注read讀取事件即注冊,有數(shù)據(jù)讀取了則處理器讀取請求數(shù)據(jù)并返回.
客戶端過程:
啟動客戶端,完成一些初始化工作.
根據(jù)服務(wù)端ip及端口發(fā)起連接.
往服務(wù)端發(fā)送數(shù)據(jù),并注冊read讀取事件
循環(huán)里selector.select()阻塞,只有當(dāng)至少一個注冊的事件發(fā)生的時候才會繼續(xù),循環(huán)里面處理發(fā)生的注冊事件.
注冊事件發(fā)生時交給處理器,若為連接事件并且連接成功則跳過即不予處理等待讀取事件發(fā)送.
初始化工作如打開selector,channel,設(shè)置通道模式是否阻塞.
單線程Reactor,工作者線程池但在單線程Reactor模式中,不僅I/O操作在該Reactor線程上,連非I/O的業(yè)務(wù)操作也在該線程上進行處理了,這可能會大大延遲I/O請求的響應(yīng)。所以我們應(yīng)該將非I/O的業(yè)務(wù)邏輯操作從Reactor線程上卸載,以此來加速Reactor線程對I/O請求的響應(yīng).
添加了一個工作者線程池,并將非I/O操作從Reactor線程中移出轉(zhuǎn)交給工作者線程池來執(zhí)行。這樣能夠提高Reactor線程的I/O響應(yīng),不至于因為一些耗時的業(yè)務(wù)邏輯而延遲對后面I/O請求的處理。
改進的版本中,所以的I/O操作依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操作。
對于一些小容量應(yīng)用場景,可以使用單線程模型。但是對于高負載、大并發(fā)或大數(shù)據(jù)量的應(yīng)用場景卻不合適,主要原因如下:
① 一個NIO線程同時處理成百上千的鏈路,性能上無法支撐,即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的讀取和發(fā)送;
②當(dāng)NIO線程負載過重之后,處理速度將變慢,這會導(dǎo)致大量客戶端連接超時,超時之后往往會進行重發(fā),這更加重了NIO線程的負載,最終會導(dǎo)致大量消息積壓和處理超時,成為系統(tǒng)的性能瓶頸;
多Reactor線程模式Reactor線程池中的每一Reactor線程都會有自己的Selector、線程和分發(fā)的事件循環(huán)邏輯。
mainReactor可以只有一個,但subReactor一般會有多個。mainReactor線程主要負責(zé)接收客戶端的連接請求,然后將接收到的SocketChannel傳遞給subReactor,由subReactor來完成和客戶端的通信。
流程:
①注冊一個Acceptor事件處理器到mainReactor中,Acceptor事件處理器所關(guān)注的事件是ACCEPT事件,這樣mainReactor會監(jiān)聽客戶端向服務(wù)器端發(fā)起的連接請求事件(ACCEPT事件)。啟動mainReactor的事件循環(huán)。
②客戶端向服務(wù)器端發(fā)起一個連接請求,mainReactor監(jiān)聽到了該ACCEPT事件并將該ACCEPT事件派發(fā)給Acceptor處理器來進行處理。Acceptor處理器通過accept()方法得到與這個客戶端對應(yīng)的連接(SocketChannel),然后將這個SocketChannel傳遞給subReactor線程池。
③subReactor線程池分配一個subReactor線程給這個SocketChannel,即,將SocketChannel關(guān)注的READ事件以及對應(yīng)的READ事件處理器注冊到subReactor線程中。當(dāng)然你也注冊WRITE事件以及WRITE事件處理器到subReactor線程中以完成I/O寫操作。Reactor線程池中的每一Reactor線程都會有自己的Selector、線程和分發(fā)的循環(huán)邏輯。
④當(dāng)有I/O事件就緒時,相關(guān)的subReactor就將事件派發(fā)給響應(yīng)的處理器處理。注意,這里subReactor線程只負責(zé)完成I/O的read()操作,在讀取到數(shù)據(jù)后將業(yè)務(wù)邏輯的處理放入到線程池中完成,若完成業(yè)務(wù)邏輯后需要返回數(shù)據(jù)給客戶端,則相關(guān)的I/O的write操作還是會被提交回subReactor線程來完成。
注意,所以的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)依舊還是在Reactor線程(mainReactor線程 或 subReactor線程)中完成的。Thread Pool(線程池)僅用來處理非I/O操作的邏輯。
多Reactor線程模式將“接受客戶端的連接請求”和“與該客戶端的通信”分在了兩個Reactor線程來完成。mainReactor完成接收客戶端連接請求的操作,它不負責(zé)與客戶端的通信,而是將建立好的連接轉(zhuǎn)交給subReactor線程來完成與客戶端的通信,這樣一來就不會因為read()數(shù)據(jù)量太大而導(dǎo)致后面的客戶端連接請求得不到即時處理的情況。并且多Reactor線程模式在海量的客戶端并發(fā)請求的情況下,還可以通過實現(xiàn)subReactor線程池來將海量的連接分發(fā)給多個subReactor線程,在多核的操作系統(tǒng)中這能大大提升應(yīng)用的負載和吞吐量。
Netty服務(wù)端使用了“多Reactor線程模式”
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/77728.html
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
摘要:是什么是一個異步的,事件驅(qū)動的網(wǎng)絡(luò)編程框架。責(zé)任鏈模式通過將組裝起來,通過向里添加來監(jiān)聽處理發(fā)生的事件。相比于的的不僅易用,而且還支持自動擴容。入站入站事件一般是由外部觸發(fā)的,如收到數(shù)據(jù)。 netty是什么? netty是一個異步的,事件驅(qū)動的網(wǎng)絡(luò)編程框架。 netty的技術(shù)基礎(chǔ) netty是對Java NIO和Java線程池技術(shù)的封裝 netty解決了什么問題 使用Java IO進行...
摘要:如果什么事都沒得做,它也不會死循環(huán),它會將線程休眠起來,直到下一個事件來了再繼續(xù)干活,這樣的一個線程稱之為線程。而請求處理邏輯既可以使用單獨的線程池進行處理,也可以跟放在讀寫線程一塊處理。 Netty到底是什么 從HTTP說起 有了Netty,你可以實現(xiàn)自己的HTTP服務(wù)器,F(xiàn)TP服務(wù)器,UDP服務(wù)器,RPC服務(wù)器,WebSocket服務(wù)器,Redis的Proxy服務(wù)器,MySQL的P...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會被注冊在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會被注冊在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
閱讀 2324·2019-08-30 15:54
閱讀 2045·2019-08-30 13:49
閱讀 728·2019-08-29 18:44
閱讀 880·2019-08-29 18:39
閱讀 1168·2019-08-29 15:40
閱讀 1589·2019-08-29 12:56
閱讀 3214·2019-08-26 11:39
閱讀 3160·2019-08-26 11:37