摘要:完成客戶端服務(wù)器通信,需要基于協(xié)議之上,自定義一套簡單的通信協(xié)議,其中數(shù)據(jù)交換方式需要使用自定義幀。輸入數(shù)據(jù)處理器以下為輸入數(shù)據(jù)的第一個(gè)處理器,可以保證無論幀經(jīng)歷怎樣的粘包拆包,均可以準(zhǔn)確提取每一個(gè)自定義幀的數(shù)據(jù)部分。
「博客搬家」 原地址: 簡書 原發(fā)表時(shí)間: 2017-03-26
本文采用 Netty 這一最流行的 Java NIO 框架,作為 Java 服務(wù)器通信部分的基礎(chǔ)框架,探索使用一個(gè)通道、一臺服務(wù)器對多個(gè)客戶端提供服務(wù)。
完成客戶端 - 服務(wù)器通信,需要基于 TCP 協(xié)議之上,自定義一套簡單的通信協(xié)議,其中數(shù)據(jù)交換方式需要使用自定義幀。為實(shí)現(xiàn)以上方案,本文采用 Netty 框架實(shí)現(xiàn) Java 服務(wù)器的通信部分。
Netty 是由 JBoss 提供的一個(gè) Java開源 框架。Netty 提供異步的、事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。也就是說,Netty 是一個(gè)基于 NIO 的客戶、服務(wù)器端編程框架,使用Netty 可以確保你快速和簡單的開發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶,服務(wù)端應(yīng)用。Netty 相當(dāng)簡化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開發(fā)過程,例如,TCP 和 UDP 的 socket 服務(wù)開發(fā)。
本項(xiàng)目的硬件設(shè)備集群使用 CAN 總線作為通信協(xié)議,硬件設(shè)備產(chǎn)生的數(shù)據(jù)和工作人員的控制指令均由服務(wù)器后端應(yīng)用程序處理并存儲。由于服務(wù)器并未原生支持 CAN 總線,故為了方便起見,使用「CAN轉(zhuǎn)以太網(wǎng)」模塊作為 CAN 協(xié)議和 TCP 協(xié)議交換的中介,以謀求實(shí)現(xiàn)的簡單化。項(xiàng)目總體架構(gòu)圖如下圖所示:
CAN - bus,即控制器局域網(wǎng),是國際上應(yīng)用最廣泛的現(xiàn)場總線之一。1. Netty 框架的學(xué)習(xí)作為一種技術(shù)先進(jìn)、可靠性高、功能完善、成本合理的遠(yuǎn)程網(wǎng)絡(luò)通信控制方式,CAN - bus 已經(jīng)被廣泛應(yīng)用到各個(gè)自動化控制系統(tǒng)中。從高速的網(wǎng)絡(luò)到低價(jià)位的多路接線都可以使用 CAN - bus。例如,在汽車電子、自動控制、智能大廈、電路系統(tǒng)、安防監(jiān)控等領(lǐng)域。
以下提供幾篇不錯(cuò)的文章,幫助大家學(xué)習(xí) Netty 這一頗受矚目的框架。
《Netty in Action》中文版 - 并發(fā)編程網(wǎng)
Essential Netty in Action -《Netty 實(shí)戰(zhàn)(精髓)》
Netty 4.x User Guide 中文翻譯《Netty 4.x 用戶指南》
2. Bootstrapping 服務(wù)器方案以下代碼是 Bootstrapping 服務(wù)器的實(shí)現(xiàn)方案:
public class KyServer{ private SuccessfulListener launchListener; private SuccessfulListener finishListener; private NioEventLoopGroup group; public void start() { new Thread(() -> { group = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializerTest()); ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(30232)); channelFuture.addListener( (ChannelFutureListener) future -> startListenerHandle(future, launchListener)); }).start(); } private void startListenerHandle(Future future, SuccessfulListener listener) { if (!future.isSuccess()) future.cause().printStackTrace(); if (listener != null) listener.onSuccess(future.isSuccess()); } public void setLaunchSuccessfulListener( SuccessfulListener successfulListener) { this.launchListener = successfulListener; } public void setFinishSuccessfulListener( SuccessfulListener finishListener) { this.finishListener = finishListener; } public void shutdown() { if (group != null) { Future> futureShutdown = group.shutdownGracefully(); futureShutdown.addListener(future -> startListenerHandle(future, finishListener)); } } }2.1 Bootstrapping 服務(wù)器的設(shè)計(jì)要點(diǎn)
創(chuàng)建一個(gè) ServerBootstrap 實(shí)例來啟動和綁定服務(wù)器
創(chuàng)建并且分配一個(gè) NioEventLoopgroup 實(shí)例來處理 event,比如接受新的連接和讀/寫數(shù)據(jù)
指定本地 InetSocketAddress 到服務(wù)器綁定的端口
用 ChannelHandler 實(shí)例來初始化 Channel
調(diào)用 ServerBootstrap.bind() 來綁定服務(wù)器
2.2 服務(wù)器監(jiān)聽器的設(shè)計(jì)「觀察者模式」首先在該類中設(shè)置成員變量:
private SuccessfulListener launchListener; private SuccessfulListener finishListener;
而后添加該變量的 set 方法,以及監(jiān)聽器的處理方法:
private void startListenerHandle(Future future, SuccessfulListener listener) { if (!future.isSuccess()) future.cause().printStackTrace(); if (listener != null) listener.onSuccess(future.isSuccess()); } public void setLaunchSuccessfulListener(SuccessfulListener successfulListener) { this.launchListener = successfulListener; } public void setFinishSuccessfulListener(SuccessfulListener finishListener) { this.finishListener = finishListener; }
在服務(wù)器啟動監(jiān)聽時(shí),執(zhí)行如下代碼
ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(30232)); channelFuture.addListener(future -> startListenerHandle(future, launchListener));
在外部關(guān)閉服務(wù)器時(shí),執(zhí)行該方法:
public void shutdown() { if (group != null) { Future> futureShutdown = group.shutdownGracefully(); futureShutdown.addListener(future -> startListenerHandle(future, finishListener)); } }
通過如上方法,外部操作者可以方便得知服務(wù)器是否啟動成功以及是否結(jié)束成功,使用觀察者模式,完美實(shí)現(xiàn)了對服務(wù)器啟動及關(guān)閉的監(jiān)聽。
3. 服務(wù)器業(yè)務(wù)邏輯的實(shí)現(xiàn)首先使用初始化方法 ServerChannelInitializer 完成所有 ChannelHandler 對 Channel 的綁定操作:
public class ServerChannelInitializer extends ChannelInitializer3.1 輸入數(shù)據(jù)處理方案 3.1.1 自定義幀方案{ @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new LoggingHandler("NO1")); byte head = 0x11; ch.pipeline().addLast(new FrameIdentifierChannelInboundHandler(head)); ch.pipeline().addLast(new ShowByteBufAsFrameInBoundHandler()); } }
自定義幀包括「幀標(biāo)識位」、「數(shù)據(jù)長度」、「數(shù)據(jù)體」,如下圖所示,:
幀標(biāo)識位:0x11。
數(shù)據(jù)長度:兩個(gè)字節(jié),可表示數(shù)據(jù)部分大小最大為 2 ^ 16 - 1 。
數(shù)據(jù)體:實(shí)際有用的數(shù)據(jù)。
3.1.2 輸入數(shù)據(jù)處理器以下為輸入數(shù)據(jù)的第一個(gè)處理器,可以保證無論 TCP 幀經(jīng)歷怎樣的粘包、拆包,均可以準(zhǔn)確提取每一個(gè)自定義幀的數(shù)據(jù)部分。
/** * 入端自定義幀提取處理器 * 將數(shù)據(jù)流提取出完整的自定義幀并傳入下一個(gè)處理器 */ public class FrameIdentifierChannelInboundHandler extends SimpleChannelInboundHandler{ private byte[] frameHead; private int frameHeadLength; private int frameBodyLength; private FrameReceivedEnum frameStatus = FrameReceivedEnum.READY; private ByteBuf holdByteBuf = Unpooled.buffer(1024); FrameIdentifierChannelInboundHandler(byte... frameHead) { this(); this.frameHead = frameHead; frameHeadLength = frameHead.length; } @Override protected void channelRead0 (ChannelHandlerContext ctx, ByteBuf msg) { //數(shù)據(jù)讀入本地buffer holdByteBuf.writeBytes(msg); while (true) { //若讀取狀態(tài)為: 開始讀取 if (frameStatus == FrameReceivedEnum.READY) { if (!matchFrameHead(holdByteBuf)) { holdByteBuf.clear(); break; } } //若讀取狀態(tài)為: 幀長讀取 //數(shù)據(jù)體完全包含在 buffer 內(nèi),則可通過此狀態(tài) if (frameStatus == FrameReceivedEnum.READING_LENGTH) { if (holdByteBuf.readableBytes() <= 1) break; //無符號 short 需要用 int 型引用 int currentFrameLength = holdByteBuf.getUnsignedShort(holdByteBuf.readerIndex()); //可讀byte數(shù)為長度計(jì)數(shù)(2)+數(shù)據(jù)體長度; 所以當(dāng)前幀長+2 <= 可讀幀長 if (currentFrameLength + 2 <= holdByteBuf.readableBytes()) { frameBodyLength = holdByteBuf.readUnsignedShort(); frameStatus = FrameReceivedEnum.READING_BODY; } else { break; } } //若讀取狀態(tài)為: 數(shù)據(jù)體讀取 //預(yù)設(shè)數(shù)據(jù)體完全包含在buffer內(nèi),否則拋出異常 if (frameStatus == FrameReceivedEnum.READING_BODY) { if (frameBodyLength == 0) { frameStatus = FrameReceivedEnum.READY; frameBodyLength = -1; holdByteBuf.discardReadBytes(); } else if (frameBodyLength > 0) { ByteBuf returnBuf = Unpooled.buffer(frameBodyLength); holdByteBuf.readBytes(returnBuf); frameStatus = FrameReceivedEnum.READY; // ctx.fireChannelRead(returnBuf); ctx.writeAndFlush(returnBuf); frameBodyLength = -1; holdByteBuf.discardReadBytes(); } else { throw new FrameLoadException("自定義幀長度計(jì)數(shù)異常"); } } else { throw new FrameLoadException("自定義幀讀取異常"); } } } private boolean matchFrameHead(ByteBuf byteBuf) { while (true) { if (byteBuf.readableBytes() < frameHeadLength) { return false; } if (frameHead[0] == byteBuf.readByte()) { frameStatus = FrameReceivedEnum.READING_LENGTH; return true; } } } }
以下為第二個(gè)輸入數(shù)據(jù)處理器,可將前一處理器的結(jié)果「優(yōu)雅」打印到控制臺上并原樣發(fā)送至客戶端:
public class ShowByteBufAsFrameInBoundHandler extends SimpleChannelInboundHandler4. 參考鏈接{ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { System.out.println(ByteBufUtil.prettyHexDump(byteBuf)); ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer(msg)); } }
CAN 轉(zhuǎn)以太網(wǎng)設(shè)備介紹
Netty - 百度百科
《Netty in Action》中文版 - 并發(fā)編程網(wǎng)
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/68253.html
摘要:提供異步的事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序??偨Y(jié)我們完成了服務(wù)端的簡單搭建,模擬了聊天會話場景。 之前一直在搞前端的東西,都快忘了自己是個(gè)java開發(fā)。其實(shí)還有好多java方面的東西沒搞過,突然了解到netty,覺得有必要學(xué)一學(xué)。 介紹 Netty是由JBOSS提供的一個(gè)java開源框架。Netty提供異步的、事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框...
摘要:是一個(gè)分布式服務(wù)框架,以及治理方案。手寫注意要點(diǎn)手寫注意要點(diǎn)基于上文中對于協(xié)議的理解,如果我們自己去實(shí)現(xiàn),需要考慮哪些技術(shù)呢其實(shí)基于圖的整個(gè)流程應(yīng)該有一個(gè)大概的理解。基于手寫實(shí)現(xiàn)基于手寫實(shí)現(xiàn)理解了協(xié)議后,我們基于來實(shí)現(xiàn)一個(gè)通信框架。閱讀這篇文章之前,建議先閱讀和這篇文章關(guān)聯(lián)的內(nèi)容。[1]詳細(xì)剖析分布式微服務(wù)架構(gòu)下網(wǎng)絡(luò)通信的底層實(shí)現(xiàn)原理(圖解)[2][年薪60W的技巧]工作了5年,你真的理解N...
摘要:啟動一個(gè)線程,獲取阻塞隊(duì)列的元素,當(dāng)通道發(fā)生事件時(shí),隊(duì)列會被放入事件對象啟動一個(gè)定時(shí)器,每個(gè)執(zhí)行一次,掃描,超時(shí)沒有獲取結(jié)果的會被移除掉客戶端跟服務(wù)器端差不多。而這個(gè)對象會在傳輸之前進(jìn)行編碼,消息接收到進(jìn)行解碼。 rocketMQ通信模塊 Rocketmq的通信層是基于通信框架netty 4.0.21.Final之上做了簡單的協(xié)議封裝,基本的類圖如下: showImg(https://...
摘要:后端好書閱讀與推薦系列文章后端好書閱讀與推薦后端好書閱讀與推薦續(xù)后端好書閱讀與推薦續(xù)二后端好書閱讀與推薦續(xù)三這里依然記錄一下每本書的亮點(diǎn)與自己讀書心得和體會,分享并求拍磚。然后又請求封鎖,當(dāng)釋放了上的封鎖之后,系統(tǒng)又批準(zhǔn)了的請求一直等待。 后端好書閱讀與推薦系列文章:后端好書閱讀與推薦后端好書閱讀與推薦(續(xù))后端好書閱讀與推薦(續(xù)二)后端好書閱讀與推薦(續(xù)三) 這里依然記錄一下每本書的...
閱讀 2035·2021-11-24 11:16
閱讀 3325·2021-09-10 10:51
閱讀 3335·2021-08-03 14:03
閱讀 1333·2019-08-29 17:03
閱讀 3306·2019-08-29 12:36
閱讀 2335·2019-08-26 14:06
閱讀 555·2019-08-23 16:32
閱讀 2844·2019-08-23 13:42