摘要:綁定完成后允許套接字進(jìn)行連接并等待連接。服務(wù)端根據(jù)報(bào)文返回響應(yīng),并關(guān)閉連接。單線程服務(wù)器多進(jìn)程及多線程服務(wù)器復(fù)用服務(wù)器復(fù)用的多線程服務(wù)器單線程服務(wù)器一次只處理一個(gè)請(qǐng)求,直到其完成為止。
前言
本篇文章將涉及以下內(nèi)容:
IO實(shí)現(xiàn)Java Socket通信
NIO實(shí)現(xiàn)Java Socket通信
閱讀本文之前最好了解過(guò):
Java IO
Java NIO
Java Concurrency
TCP/IP協(xié)議
TCP 套接字TCP套接字是指IP號(hào)+端口號(hào)來(lái)識(shí)別一個(gè)應(yīng)用程序,從而實(shí)現(xiàn)端到端的通訊。其實(shí)一個(gè)套接字也可以被多個(gè)應(yīng)用程序使用,但是通常來(lái)說(shuō)承載的是一個(gè)應(yīng)用程序的流量。建立在TCP連接之上最著名的協(xié)議為HTTP,我們?nèi)粘I钪惺褂玫臑g覽器訪問(wèn)網(wǎng)頁(yè)通常都是使用HTTP協(xié)議來(lái)實(shí)現(xiàn)的。
先來(lái)了解一下通過(guò)TCP套接字實(shí)現(xiàn)客戶端和服務(wù)器端的通信。
在TCP客戶端發(fā)出請(qǐng)求之前,服務(wù)器會(huì)創(chuàng)建新的套接字(socket),并將套接字綁定到某個(gè)端口上去(bind),默認(rèn)情況下HTTP服務(wù)的端口號(hào)為80。綁定完成后允許套接字進(jìn)行連接(listen)并等待連接(accept)。這里的accept方法會(huì)掛起當(dāng)前的進(jìn)程直到有Socket連接。
在服務(wù)器準(zhǔn)備就緒后,客戶端就可以發(fā)起Socket連接??蛻舳双@取服務(wù)器的Socket套接字(IP號(hào):端口號(hào)),并新建一個(gè)本地的套接字。然后連同本地的套接字發(fā)送到服務(wù)器上。
服務(wù)器accept該請(qǐng)求并讀取該請(qǐng)求。這里面包括有TCP的三次連接過(guò)程。連接建立之后,客戶端發(fā)送HTTP請(qǐng)求并等待響應(yīng)。服務(wù)端根據(jù)HTTP報(bào)文返回響應(yīng),并關(guān)閉連接。
Web Server當(dāng)下的Web服務(wù)器能夠同時(shí)支持?jǐn)?shù)千條連接,一個(gè)客戶端可能向服務(wù)器打開(kāi)一條或多條連接,這些連接的使用狀態(tài)各不相同,使用率也差異很大。如何有效的利用服務(wù)器資源提供低延時(shí)的服務(wù)成了每個(gè)服務(wù)器都需要考慮的問(wèn)題。根據(jù)服務(wù)器的處理方式,可以分為以下4種服務(wù)器,我們也將分別對(duì)其進(jìn)行簡(jiǎn)單的實(shí)現(xiàn)。
單線程服務(wù)器
多進(jìn)程及多線程服務(wù)器
復(fù)用IO服務(wù)器
復(fù)用的多線程服務(wù)器
單線程服務(wù)器一次只處理一個(gè)請(qǐng)求,直到其完成為止。一個(gè)事務(wù)處理結(jié)束后,才會(huì)去處理下一條連接。實(shí)現(xiàn)簡(jiǎn)單,但是性能堪憂。
多進(jìn)程及多線程服務(wù)器可以根據(jù)需要?jiǎng)?chuàng)建,或預(yù)先創(chuàng)建一下線程/進(jìn)程??梢詾槊織l連接分配一個(gè)線程/進(jìn)程。但是當(dāng)強(qiáng)求數(shù)量過(guò)多時(shí),過(guò)多的線程會(huì)導(dǎo)致內(nèi)存和系統(tǒng)資源的浪費(fèi)。
復(fù)用I/O服務(wù)器在復(fù)用結(jié)構(gòu)中,會(huì)同時(shí)監(jiān)視所有連接上的活動(dòng),當(dāng)連接狀態(tài)發(fā)生變化時(shí),就對(duì)那條連接進(jìn)行少量的處理。處理結(jié)束后,就將連接返回到開(kāi)放連接列表中,等待下一次狀態(tài)的變化。之后在有事情可做時(shí)才會(huì)對(duì)連接進(jìn)行處理。在空閑連接上等待的時(shí)候不會(huì)綁定線程和進(jìn)程。
復(fù)用的多線程服務(wù)器多個(gè)線程(對(duì)應(yīng)多個(gè)CPU)中的每一個(gè)都在觀察打開(kāi)的連接(或是打開(kāi)連接中的一個(gè)子集)。并對(duì)每條連接的狀態(tài)變化時(shí)執(zhí)行任務(wù)。
Socket通信基本實(shí)現(xiàn)根據(jù)我們上面講述的Socket通信的步驟,在Java中我們可以按照以下方式逐步建立連接:
首先開(kāi)啟服務(wù)器端的SocketServer并且將其綁定到一個(gè)端口等待Socket連接:
ServerSocket serverSocket = new ServerSocket(PORT_ID:int); Socket socket = serverSocket.accept();
當(dāng)沒(méi)有Socket連接時(shí),服務(wù)器會(huì)在accept方法處阻塞。
然后我們?cè)诳蛻舳诵陆ㄒ粋€(gè)Socket套接字并且連接服務(wù)器:
Socket socket = new Socket(SERVER_SOCKET_IP, SERVER_SOCKET_PORT); socket.setSoTimeout(100000);
如果連接失敗的話,將會(huì)拋出異常說(shuō)明服務(wù)器當(dāng)前不可以使用。
連接成功給的話,客戶端就可以獲取Socket的輸入流和輸出流并發(fā)送消息。寫(xiě)入Socket的輸出流的信息將會(huì)先存儲(chǔ)在客戶端本地的緩存隊(duì)列中,滿足一定條件后會(huì)flush到服務(wù)器的輸入流。服務(wù)器獲取輸入后可以解析輸入的數(shù)據(jù),并且將響應(yīng)內(nèi)容寫(xiě)入服務(wù)器的輸出流并返回客戶端。最后客戶端從輸入流讀取數(shù)據(jù)。
客戶端獲取Socket輸入輸出流,這里將字節(jié)流封裝為字符流。
//獲取Socket的輸出流,用來(lái)發(fā)送數(shù)據(jù)到服務(wù)端 PrintStream out = new PrintStream(socket.getOutputStream()); //獲取Socket的輸入流,用來(lái)接收從服務(wù)端發(fā)送過(guò)來(lái)的數(shù)據(jù) BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream()));
客戶端發(fā)送數(shù)據(jù)并等待響應(yīng)
String str = "hello world"; out.println(str); String echo = buf.readLine(); System.out.println("收到消息:" + echo);
這里需要注意的是,IO流是阻塞式IO,因此在讀取服務(wù)端響應(yīng)的過(guò)程中(即buf.reaLine()這一行)會(huì)阻塞直到收到服務(wù)器響應(yīng)。
客戶端發(fā)送結(jié)束之后不要忘了關(guān)閉IO和Socket通信。
out.close(); buf.close(); socket.close();
服務(wù)器對(duì)消息的處理和客戶端類似,后面會(huì)貼上完整代碼。
Java Socket通信阻塞式通信實(shí)現(xiàn)這里我們對(duì)上述的理論進(jìn)行簡(jiǎn)單的實(shí)現(xiàn)。這里我們實(shí)現(xiàn)一個(gè)簡(jiǎn)單的聊天室,只不過(guò)其中一方是Server角色而另一個(gè)為Client角色。二者都通過(guò)System.in流輸入數(shù)據(jù),并發(fā)送給對(duì)方。正如我們前面所說(shuō),IO流的通信是阻塞式的,因此在等待對(duì)方響應(yīng)的過(guò)程中,進(jìn)程將會(huì)掛起,我們這時(shí)候輸入的數(shù)據(jù)將要等到下一輪會(huì)話中才能被讀取。
client端
import java.io.*; import java.net.Socket; import java.net.SocketTimeoutException; public class SocketClient { public static void send(String server, int port){ try { Socket socket = new Socket(server, port); socket.setSoTimeout(100000); System.out.println("正在連接服務(wù)器"); //從控制臺(tái)讀入數(shù)據(jù) BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); //獲取Socket的輸出流,用來(lái)發(fā)送數(shù)據(jù)到服務(wù)端 PrintStream out = new PrintStream(socket.getOutputStream()); //獲取Socket的輸入流,用來(lái)接收從服務(wù)端發(fā)送過(guò)來(lái)的數(shù)據(jù) BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream())); boolean running = true; while(running){ System.out.print("輸入信息:"); String str = input.readLine(); out.println(str); if("bye".equals(str)){ running = false; }else{ try{ //從服務(wù)器端接收數(shù)據(jù)有個(gè)時(shí)間限制(系統(tǒng)自設(shè),也可以自己設(shè)置),超過(guò)了這個(gè)時(shí)間,便會(huì)拋出該異常 String echo = buf.readLine(); System.out.println("收到消息:" + echo); }catch(SocketTimeoutException e){ System.out.println("Time out, No response"); } } } input.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } finally { } } public static void main(String[] args){ send("127.0.0.1", 2048); } }
Server端
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; public class SocketServer { public static void main(String[] args) throws IOException { //服務(wù)端在2048端口監(jiān)聽(tīng)客戶端請(qǐng)求的TCP連接 ServerSocket server = new ServerSocket(2048); Socket client = null; boolean f = true; while(f){ //等待客戶端的連接,如果沒(méi)有獲取連接 client = server.accept(); System.out.println("與客戶端連接成功!"); //為每個(gè)客戶端連接開(kāi)啟一個(gè)線程 new Thread(new ServerThread(client)).start(); } server.close(); } }
服務(wù)器處理數(shù)據(jù)
import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.Socket; public class ServerThread implements Runnable{ private Socket client = null; public ServerThread(Socket client){ this.client = client; } @Override public void run() { try{ //獲取Socket的輸出流,用來(lái)向客戶端發(fā)送數(shù)據(jù) PrintStream out = new PrintStream(client.getOutputStream()); //獲取Socket的輸入流,用來(lái)接收從客戶端發(fā)送過(guò)來(lái)的數(shù)據(jù) BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream())); BufferedReader serverResponse = new BufferedReader(new InputStreamReader(System.in)); boolean flag =true; while(flag){ //接收從客戶端發(fā)送過(guò)來(lái)的數(shù)據(jù) String str = buf.readLine(); System.out.println("收到消息:" + str); if(str == null || "".equals(str)){ flag = false; }else{ if("bye".equals(str)){ flag = false; }else{ //將接收到的字符串前面加上echo,發(fā)送到對(duì)應(yīng)的客戶端 System.out.print("發(fā)送回復(fù):"); String response = serverResponse.readLine(); out.println(response); } } } out.close(); client.close(); }catch(Exception e){ e.printStackTrace(); } } }
可以和小伙伴試試看,分別啟動(dòng)SocketServer和SocketClient并進(jìn)行通信。不過(guò)前提是你們兩個(gè)需要在一個(gè)局域網(wǎng)中。
Java實(shí)現(xiàn)單線程服務(wù)器上面的服務(wù)器其實(shí)只在主線程監(jiān)聽(tīng)了一個(gè)Socket連接,并在30秒之后將其自動(dòng)關(guān)閉了。我們將實(shí)現(xiàn)一個(gè)經(jīng)典的單線程服務(wù)器。原理和上面相似,這里我們可以直接通過(guò)向服務(wù)器發(fā)送HTTP請(qǐng)求來(lái)驗(yàn)證該服務(wù)器的運(yùn)行。
import java.io.*; import java.net.ServerSocket; import java.net.Socket; public class SingleThreadServer implements Runnable{ private ServerSocket serverSocket; public SingleThreadServer(ServerSocket serverSocket){ this.serverSocket = serverSocket; } @Override public void run() { Socket socket = null; try{ while (!Thread.interrupted()){ socket = serverSocket.accept(); //谷歌瀏覽器每次會(huì)發(fā)送兩個(gè)請(qǐng)求 //一次用于獲取html //一次用于獲取favicon //如果獲取favicon成功就緩存,否則會(huì)一直請(qǐng)求獲得favicon //而火狐瀏覽器第一次也會(huì)發(fā)出這兩個(gè)請(qǐng)求 //在獲得favicon失敗后就不會(huì)繼續(xù)嘗試獲取favicon //因此使用谷歌瀏覽器訪問(wèn)該Server的話,你會(huì)看到 連接成功 被打印兩次 System.out.println("連接成功"); process(socket); } } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } private void process(Socket socket){ try { InputStreamReader inputStreamReader = null; BufferedOutputStream bufferedOutputStream = null; try{ inputStreamReader = new InputStreamReader(socket.getInputStream()); bufferedOutputStream = new BufferedOutputStream(socket.getOutputStream()); //這里無(wú)法正常讀取輸入流,因?yàn)樵跊](méi)有遇到EOF之前,流會(huì)任務(wù)socket輸入尚未結(jié)束,將會(huì)繼續(xù)等待直到socket中斷 //所以這里我們將暫時(shí)不讀取Socket的輸入流中的內(nèi)容。 //int size; //char[] buffer = new char[1024]; //StringBuilder stringBuilder = new StringBuilder(); //while ((size = inputStreamReader.read(buffer)) > 0){ // stringBuilder.append(buffer, 0, size); //} byte[] responseDocument = " Hello World ".getBytes("UTF-8"); byte[] responseHeader = ("HTTP/1.1 200 OK Content-Type: text/html; charset=UTF-8 Content-Length: " + responseDocument.length + " ").getBytes("UTF-8"); bufferedOutputStream.write(responseHeader); bufferedOutputStream.write(responseDocument); }finally { bufferedOutputStream.close(); } } catch (IOException e) { e.printStackTrace(); } }
該服務(wù)器用單一線程處理每次請(qǐng)求,每個(gè)線程都將等待服務(wù)器處理完上一個(gè)請(qǐng)求之后才能獲得響應(yīng)。這里需要注意,純HTTP請(qǐng)求的輸入流的讀取會(huì)遇到輸入流阻塞的問(wèn)題,因?yàn)镠TTP請(qǐng)求并沒(méi)有輸入流可識(shí)別的EOF標(biāo)記。從而導(dǎo)致服務(wù)器一直掛起在讀取輸入流的地方。它的解決方法如下:
客戶端關(guān)閉Socket連接,強(qiáng)制服務(wù)器關(guān)閉該Socket連接。但是同時(shí)也丟失服務(wù)器響應(yīng)
自定義協(xié)議,從而服務(wù)器可以識(shí)別數(shù)據(jù)的終點(diǎn)。
啟動(dòng)服務(wù)器
public static void main(String[] args) throws IOException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); ServerSocket serverSocket = new ServerSocket(2048); executorService.execute(new SingleThreadServer(serverSocket)); // TimeUnit.SECONDS.sleep(10); // System.out.println("shut down server"); // executorService.shutdownNow(); }
注意要先關(guān)閉之前占用2048端口號(hào)的服務(wù)器。
我們也可以使用代碼來(lái)測(cè)試:
import java.io.*; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class TestSingleThreadServer { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0 ; i<10 ; i++){ final int threadId = i; executorService.execute(() ->{ try { Socket socket = new Socket("127.0.0.1", 20006); socket.setSoTimeout(5000); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String line = bufferedReader.readLine(); System.out.println(threadId + ":" + line); socket.close();; } catch (IOException e) { e.printStackTrace(); } }); } TimeUnit.SECONDS.sleep(40); executorService.shutdownNow(); } }Java實(shí)現(xiàn)多線程服務(wù)器
這里我們將為每一個(gè)Socket連接提供一個(gè)線程來(lái)處理。基本實(shí)現(xiàn)和上面差不多,只是將每一個(gè)Socket連接丟給一個(gè)額外的線程來(lái)處理。這里可以參考前面的簡(jiǎn)易聊天室來(lái)試著自己實(shí)現(xiàn)以下。
Java NIO實(shí)現(xiàn)復(fù)用服務(wù)器NIO的出現(xiàn)改變了舊式Java讀取IO流的方式。首先,它支持非阻塞式讀取,其次它可以使用一個(gè)線程來(lái)管理多個(gè)信道。多線程表面上看起來(lái)可以同時(shí)處理多個(gè)Socket通信,但是多線程的管理本身也消耗相當(dāng)多的資源。其次,很多信道的使用率往往并不高,一些信道往往并不是連通狀態(tài)中。如果我們可以將資源直接賦予當(dāng)前活躍的Socket通信的話,可以明顯的提高資源利用率。
先附上參考資料將在后序更新。
參考書(shū)籍HTTP權(quán)威指南
Java TCP/IP Socket 編程
Java Multithread servers
Java NIO ServerSocketChannel
想要了解更多開(kāi)發(fā)技術(shù),面試教程以及互聯(lián)網(wǎng)公司內(nèi)推,歡迎關(guān)注我的微信公眾號(hào)!將會(huì)不定期的發(fā)放福利哦~
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/68829.html
摘要:而用于主線程池的屬性都定義在中本篇只是簡(jiǎn)單介紹了一下引導(dǎo)類的配置屬性,下一篇我將詳細(xì)介紹服務(wù)端引導(dǎo)類的過(guò)程分析。 從Java1.4開(kāi)始, Java引入了non-blocking IO,簡(jiǎn)稱NIO。NIO與傳統(tǒng)socket最大的不同就是引入了Channel和多路復(fù)用selector的概念。傳統(tǒng)的socket是基于stream的,它是單向的,有InputStream表示read和Outpu...
摘要:關(guān)閉套接字和上下文備注說(shuō)明如何利用使用首先下載所需的包,解壓以后將和文件放到自己電腦中的安裝路徑中的文件夾下,最后需要將之前解壓后的包放在項(xiàng)目的中或者資源下載鏈接密碼項(xiàng)目源碼下載鏈接鏈接密碼 在講ZeroMQ前先給大家講一下什么是消息隊(duì)列。 消息隊(duì)列簡(jiǎn)介: 消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量削鋒等問(wèn)題。實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。是...
摘要:流控制通常就是在客戶端的頁(yè)面使用一個(gè)隱藏的窗口向服務(wù)端發(fā)出一個(gè)長(zhǎng)連接的請(qǐng)求。和長(zhǎng)鏈接以上幾種服務(wù)器推的技術(shù)中長(zhǎng)輪詢和流控制其實(shí)都是基于長(zhǎng)鏈接來(lái)實(shí)現(xiàn)的,也就是中所謂的。通信協(xié)議于年被定為標(biāo)準(zhǔn),并被所補(bǔ)充規(guī)范。 初探WebSocket node websocket socket.io 我們平常開(kāi)發(fā)的大部分web頁(yè)面都是主動(dòng)‘拉’的形式,如果需要更新頁(yè)面內(nèi)容,則需要刷新一個(gè),但Slack工...
閱讀 940·2021-11-16 11:56
閱讀 1748·2021-11-16 11:45
閱讀 3256·2021-10-08 10:13
閱讀 4201·2021-09-22 15:27
閱讀 785·2019-08-30 11:03
閱讀 707·2019-08-30 10:56
閱讀 1012·2019-08-29 15:18
閱讀 1793·2019-08-29 14:05