亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

ChannelPipeline 和 ChannelHandler

William_Sang / 3158人閱讀

摘要:概念與概念一致用以連接設(shè)備文件等的紐帶例如將網(wǎng)絡(luò)的讀寫客戶端發(fā)起連接主動(dòng)關(guān)閉連接鏈路關(guān)閉獲取通信雙方的網(wǎng)絡(luò)地址等的類型主要有兩種非阻塞以及阻塞數(shù)據(jù)傳輸類型有兩種按事件消息傳遞以及按字節(jié)傳遞適用方類型也有兩種服務(wù)器以及客戶端還有一些根據(jù)傳輸協(xié)

ChannelHandler Channel

Channel 概念與 java.nio.channel 概念一致, 用以連接IO設(shè)備 (socket, 文件等) 的紐帶. 例如將網(wǎng)絡(luò)的讀、寫, 客戶端發(fā)起連接, 主動(dòng)關(guān)閉連接, 鏈路關(guān)閉, 獲取通信雙方的網(wǎng)絡(luò)地址等.

Channel 的 IO 類型主要有兩種: 非阻塞IO (NIO) 以及阻塞IO(OIO).

數(shù)據(jù)傳輸類型有兩種: 按事件消息傳遞 (Message) 以及按字節(jié)傳遞 (Byte).

適用方類型也有兩種: 服務(wù)器(ServerSocket) 以及客戶端(Socket). 還有一些根據(jù)傳輸協(xié)議而制定的的Channel, 如: UDT、SCTP等.

Netty 按照類型逐層設(shè)計(jì)相應(yīng)的類. 最底層的為抽象類 AbstractChannel, 再以此根據(jù)IO類型、數(shù)據(jù)傳輸類型、適用方類型實(shí)現(xiàn). 類圖可以一目了然, 如下圖所示:

Channel 狀態(tài)

channelRegistered 狀態(tài)

/**
 * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
 */
void channelRegistered(ChannelHandlerContext ctx) throws Exception;

從注釋里面可以看到是在 Channel 綁定到 Eventloop 上面的時(shí)候調(diào)用的.

不管是 Server 還是 Client, 綁定到 Eventloop 的時(shí)候, 最終都是調(diào)用 Abstract.initAndRegister() 這個(gè)方法上(Server是在 AbstractBootstrap.doBind() 的時(shí)候調(diào)用的, Client 是在 Bootstrap.doConnect() 的時(shí)候調(diào)用的).

initAndRegister() 方法定義如下:

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // 把channel綁定到Eventloop對象上面去
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

繼續(xù)跟蹤下去會(huì)定位到 AbstractChannel.AbstractUnsafe.register0() 方法上.

        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 做實(shí)際的綁定動(dòng)作。把Channel感興趣的事件注冊到Eventloop.selector上面.具體實(shí)現(xiàn)在Abstract.doRegister()方法內(nèi)
                doRegister();
                neverRegistered = false;
                registered = true;

                // 通過pipeline的傳播機(jī)制,觸發(fā)handlerAdded事件
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                
                // 通過pipeline的傳播機(jī)制,觸發(fā)channelRegistered事件
                pipeline.fireChannelRegistered();

                // 還沒有綁定,所以這里的 isActive() 返回false.
                if (isActive()) {
                    if (firstRegistration) {
                        // 如果當(dāng)前鏈路已經(jīng)激活,則調(diào)用channelActive()方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

從上面的代碼也可以看出, 在調(diào)用完 pipeline.fireChannelRegistered() 之后, 緊接著會(huì)調(diào)用 isActive() 判斷當(dāng)前鏈路是否激活, 如果激活了則會(huì)調(diào)用 pipeline.fireChannelActive() 方法.

這個(gè)時(shí)候, 對于 Client 和 Server 都還沒有激活, 所以, 這個(gè)時(shí)候不管是 Server 還是 Client 都不會(huì)調(diào)用 pipeline.fireChanenlActive() 方法.

channelActive 狀態(tài)

從啟動(dòng)器的 bind() 接口開始, 往下調(diào)用 doBind() 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化及注冊
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 調(diào)用 doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ....
    }
}

doBind 方法又會(huì)調(diào)用 doBind0() 方法, 在 doBind0() 方法中會(huì)通過 EventLoop 去執(zhí)行 channelbind()任務(wù).

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 調(diào)用channel.bind接口
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

doBind0() 方法往下會(huì)調(diào)用到 pipeline.bind(localAddress, promise); 方法, 通過 pipeline 的傳播機(jī)制, 最終會(huì)調(diào)用到 AbstractChannel.AbstractUnsafe.bind() 方法, 這個(gè)方法主要做兩件事情:

調(diào)用 doBind(): 調(diào)用底層JDK API進(jìn)行 Channel 的端口綁定.

調(diào)用 pipeline.fireChannelActive().

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
    ....
    
    // wasActive 在綁定成功前為 false
    boolean wasActive = isActive();
    try {
        // 調(diào)用doBind()調(diào)用JDK底層API進(jìn)行端口綁定
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // 完成綁定之后,isActive() 返回true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 觸發(fā)channelActive事件
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}

也就是說當(dāng)有新客戶端連接的時(shí)候, 會(huì)變成活動(dòng)狀態(tài).

channelInactive 狀態(tài)

fireChannelnactive() 方法在兩個(gè)地方會(huì)被調(diào)用: Channel.close()Channel.disconnect().

在調(diào)用前會(huì)先確認(rèn)狀態(tài)是從 Active--->Inactive.

channelUnregistered 狀態(tài)

fireChannelUnregistered() 方法是在 ChannelEventloop 中解除注冊的時(shí)候被調(diào)用的. Channel.close() 的時(shí)候被觸發(fā)執(zhí)行.

ChannelHandler 的生命周期

handlerAdded(): 添加到 ChannelPipeline 時(shí)調(diào)用.
handlerRemoved(): 從 ChannelPipeline 中移除時(shí)調(diào)用.
exceptionCaught(): 處理過程中在 ChannelPipeline 中有錯(cuò)誤產(chǎn)生時(shí)調(diào)用.

處理 I/O 事件或截獲 I/O 操作, 并將其轉(zhuǎn)發(fā)到 ChannelPipeline 中的下一個(gè)處理程序. ChannelHandler 本身不提供許多方法, 但通常必須實(shí)現(xiàn)其子類型之一:

ChannelInboundHandler: 處理入站數(shù)據(jù)以及各種狀態(tài)變化.

ChannelOutboundHandler: 處理出站數(shù)據(jù)并且允許攔截所有的操作.

ChannelInboundHandler 接口

channelRegistered(): 當(dāng) Channel 已經(jīng)注冊到它的 EventLoop 并且能夠處理 I/O 時(shí)被調(diào)用.
channelUnregistered(): 當(dāng) Channel 從他的 EventLoop 注銷并且無法處理任何 I/O 時(shí)被調(diào)用.
channelActive(): 當(dāng) Channel 處于活動(dòng)狀態(tài)時(shí)被調(diào)用.
channelInactive(): 當(dāng) Channel 離開活動(dòng)狀態(tài)并且不再連接遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用.
channelRead(): 當(dāng)從 Channel 讀取數(shù)據(jù)時(shí)被調(diào)用.
channelReadComplete(): 當(dāng) Channel 上的一個(gè)讀操作完成時(shí)被調(diào)用. 當(dāng)所有可讀字節(jié)都從 Channel 中讀取之后, 將會(huì)調(diào)用該回調(diào)方法.

ChannelOutboundHandler 接口

出站操作和數(shù)據(jù)將由 ChannelOutboundHandler 處理. 它的方法將被 Channel ChannelPipeline 以及 ChannelHandlerContext 調(diào)用.

ChannelOutboundHandler 的一個(gè)強(qiáng)大的功能是可以按需推遲操作或事件, 這使得可以通過一些復(fù)雜的方法來處理請求. 例如, 如果到遠(yuǎn)程節(jié)點(diǎn)的寫入被暫停, 那么你可以推遲刷新操作并在稍后繼續(xù).

connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise): 當(dāng)請求將 Channel 連接到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用.
disconnect(ChannelHandlerContext ctx, ChannelPromise promise): 當(dāng)請求將 Channel 從遠(yuǎn)程節(jié)點(diǎn)斷開時(shí)被調(diào)用.
deregister(ChannelHandlerContext ctx, ChannelPromise promise): 當(dāng)請求將 Channel 從它的 EventLoop 注銷時(shí)被調(diào)用.
read(ChannelHandlerContext ctx): 當(dāng)請求從 Channel 讀取更多的數(shù)據(jù)時(shí)被調(diào)用.
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise): 當(dāng)請求通過 Channel 將數(shù)據(jù)寫到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用.
flush(ChannelHandlerContext ctx): 當(dāng)請求從 Channel 將入隊(duì)數(shù)據(jù)沖刷到遠(yuǎn)程節(jié)點(diǎn)時(shí)被調(diào)用.

ChannelPromise 和 ChannelFuture

ChannelFuture 表示 Channel 中異步I/O操作的結(jié)果, 在 netty 中所有的 I/O 操作都是異步的, I/O 的調(diào)用會(huì)直接返回, 可以通過 ChannelFuture 來獲取 I/O 操作的結(jié)果或者狀態(tài)信息.

當(dāng) I/O 操作開始時(shí), 將創(chuàng)建一個(gè)新對象. 新的對象是未完成的-它既沒有成功, 也沒有失敗, 也沒有被取消, 因?yàn)?I/O 操作還沒有完成.

如果 I/O 操作已成功完成(失敗或取消), 則對象將標(biāo)記為已完成, 其中包含更具體的信息, 例如故障原因.

請注意, 即使失敗和取消屬于已完成狀態(tài).

ChannelPromiseChannelFuture 的一個(gè)子接口, 其定義了一些可寫的方法, 如 setSuccess()setFailure(), 從而使 ChannelFuture 不可變.

優(yōu)先使用addListener(GenericFutureListener),而非await()

當(dāng)做了一個(gè) I/O 操作并有任何后續(xù)任務(wù)的時(shí)候, 推薦優(yōu)先使用 addListener(GenericFutureListener) 的方式來獲得通知, 而非 await()

addListener(GenericFutureListener) 是非阻塞的. 它會(huì)把特定的 ChannelFutureListener 添加到 ChannelFuture 中, 然后 I/O 線程會(huì)在 I/O 操作相關(guān)的 future 完成的時(shí)候通知監(jiān)聽器.

ChannelFutureListener 會(huì)利于最佳的性能和資源的利用, 因?yàn)樗稽c(diǎn)阻塞都沒有. 而且不會(huì)造成死鎖.

ChannelHandler 適配器

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 這兩個(gè)適配器類分別提供了
ChannelInboundHandlerChannelOutboundHandler 的基本實(shí)現(xiàn), 它們繼承了共同的父接口
ChannelHandler 的方法, 擴(kuò)展了抽象類 ChannelHandlerAdapter.

ChannelHandlerAdapter 還提供了實(shí)用方法 isSharable().

如果其對應(yīng)的實(shí)現(xiàn)被標(biāo)注為 Sharable, 那么這個(gè)方法將返回 true, 表示它可以被添加到多個(gè) ChannelPipeline 中.

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 中所提供的方法體調(diào)用了其相關(guān)聯(lián)的 ChannelHandlerContext 上的等效方法, 從而將事件轉(zhuǎn)發(fā)到了 ChannelPipeline 中的 ChannelHandler 中.

ChannelPipeline 接口

ChannelPipeline 將多個(gè) ChannelHandler 鏈接在一起來讓事件在其中傳播處理 (通過擴(kuò)展
ChannelInitializer). 一個(gè) ChannelPipeline 中可能不僅有入站處理器, 還有出站處理器, 入站處理器只會(huì)處理入站的事件, 而出站處理器只會(huì)處理出站的數(shù)據(jù).

每一個(gè)新創(chuàng)建的 Channel 都將會(huì)分配一個(gè)新的 ChannelPipeline, 不能附加另一個(gè) ChannelPipeline, 也不能分離當(dāng)前的.

通過調(diào)用 ChannelHandlerContext 實(shí)現(xiàn), 它將被轉(zhuǎn)發(fā)給同一個(gè)超類型的下一個(gè) ChannelHandler.

從事件途徑 ChannelPilpeline 的角度來看, ChannelPipeline 的頭部和尾端取決于該事件是入站的還是出站的.

而 Netty 總是將 ChannelPilpeline 的入站口 (左側(cè)) 作為頭部, 將出站口 (右側(cè)) 作為尾端.

當(dāng)通過調(diào)用 ChannelPilpeline.add*() 方法將入站處理器和出站處理器混合添加到 ChannelPilpeline 之后, 每一個(gè) ChannelHandler 從頭部到尾端的順序就是我們添加的順序.

ChannelPilpeline 傳播事件時(shí), 它會(huì)測試 ChannelPilpeline 中的下一個(gè) ChannelHandler 的類型是否和事件的運(yùn)動(dòng)方向相匹配. 如果不匹配, ChannelPilpeline 將跳過該 ChannelHandler 并前進(jìn)到下一個(gè), 直到它找到和該事件期望的方向相匹配的為止.

修改 ChannelPipeline

這里指修改 ChannelPipeline 中的 ChannelHandler 的編排.

通過調(diào)用 ChannelPipeline 上的相關(guān)方法, ChannelHandler 可以添加, 刪除或者替換其他的 ChannelHandler, 從而實(shí)時(shí)地修改 ChannelPipeline 的布局.

addFirst  // 將 ChannelHandler 插入第一個(gè)位置
addBefore // 在某個(gè) ChannelHandler 之前添加一個(gè)
addAfter  // 在某個(gè) ChannelHandler 之后添加一個(gè)
addLast   // 將 ChannelHandler 插入最后一個(gè)位置
remove    // 移除某個(gè) ChannelHandler
replace   // 將某個(gè) ChannelHandler 替換成指定 ChannelHandler
ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChanelHandlerChannelPipeline 之間的關(guān)聯(lián), 每當(dāng)有 ChanelHandler 添加到 ChannelPipeline 中, 都會(huì)創(chuàng)建 ChannelHandlerContext.

ChannelHandlerContext 的主要功能是管理它所關(guān)聯(lián)的 ChannelPipeline 和同一個(gè) ChannelPipeline 中的其他 ChanelHandler 之間的交互.

ChannelHandlerContext 有很多的方法, 其中一些方法也存在于 ChannelChannelPipeline 上, 但是有一點(diǎn)重要的不同.

如果調(diào)用 ChannelChannelPipeline 上的這些方法將沿著 ChannelPipeline 進(jìn)行傳播(從頭或尾開始).
而調(diào)用位于 ChannelHandlerContext 上的相同方法, 則將從當(dāng)前所關(guān)聯(lián)的 ChannelHandler 開始, 并且只會(huì)傳播給位于該 ChannelPipeline 中的下一個(gè)能夠處理該事件的 ChannelHandler.

這樣做可以減少 ChannelHandler 的調(diào)用開銷.

使用 ChannelHandlerContext

上圖為 Channel ChannelPipeline ChannelHandler 以及 ChannelHandlerContext 之間的關(guān)系.

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/74384.html

相關(guān)文章

  • Netty-ChannelHandler-ChannelPipeline

    摘要:只有在詳盡的測試之后才應(yīng)設(shè)置為這值使用的默認(rèn)采樣率檢測并報(bào)告任何發(fā)現(xiàn)的泄漏。這是默認(rèn)級別,適合絕大部分情況使用默認(rèn)的采樣率,報(bào)告所發(fā)現(xiàn)的任何的泄漏以及對應(yīng)的消息被訪問的位置類似于但是其將會(huì)對每次對消息的訪問都進(jìn)行采樣。 ChannelHandler Channel生命周期 狀態(tài) 描述 ChannelUnregistered Channel已經(jīng)被創(chuàng)建,但未注冊到EventLoo...

    warkiz 評論0 收藏0
  • Netty組件入門學(xué)習(xí)

    摘要:可以用來接收入站事件和數(shù)據(jù),隨后使用應(yīng)用程序的業(yè)務(wù)邏輯進(jìn)行處理。因?yàn)橛脩舨⒉皇顷P(guān)心所有的事件,因此提供了抽象類和。抽象類最常見的一個(gè)情況,你的應(yīng)用程序會(huì)利用一個(gè)來接受解碼消息,并對該數(shù)據(jù)應(yīng)用業(yè)務(wù)邏輯。 Channel、EventLoop和ChannelFuture Channel——Socket; EventLoop——控制流、多線程處理、并發(fā) ChannelFuture異步通知 ...

    qpal 評論0 收藏0
  • Netty 框架總結(jié)「ChannelHandler 及 EventLoop」

    摘要:隨著狀態(tài)發(fā)生變化,相應(yīng)的產(chǎn)生。這些被轉(zhuǎn)發(fā)到中的來采取相應(yīng)的操作。當(dāng)收到數(shù)據(jù)或相關(guān)的狀態(tài)改變時(shí),這些方法被調(diào)用,這些方法和的生命周期密切相關(guān)。主要由一系列組成的。采用的線程模型,在同一個(gè)線程的中處理所有發(fā)生的事。 「博客搬家」 原地址: 簡書 原發(fā)表時(shí)間: 2017-05-05 學(xué)習(xí)了一段時(shí)間的 Netty,將重點(diǎn)與學(xué)習(xí)心得總結(jié)如下,本文主要總結(jié)ChannelHandler 及 E...

    VioletJack 評論0 收藏0
  • Netty 源碼分析之 二 貫穿Netty 的大動(dòng)脈 ── ChannelPipeline (一)

    摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...

    tunny 評論0 收藏0
  • Netty學(xué)習(xí)筆記(二)

    摘要:支持很多協(xié)議,并且提供用于數(shù)據(jù)處理的容器。我們已經(jīng)知道由特定事件觸發(fā)??蓪S糜趲缀跛械膭?dòng)作,包括將一個(gè)對象轉(zhuǎn)為字節(jié)或相反,執(zhí)行過程中拋出的異常處理。提供了一個(gè)容器給鏈并提供了一個(gè)用于管理沿著鏈入站和出站事件的流動(dòng)。子類通過進(jìn)行注冊。 前兩天寫了一點(diǎn)netty相關(guān)的知識(shí),并寫了一個(gè)demo,但是對其原理還是沒有深入,今天我們來做一次研究吧 首先讓我們來認(rèn)識(shí)一下netty的幾個(gè)核心人物吧...

    0x584a 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<