亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(二)

whidy / 427人閱讀

摘要:接上篇源碼分析之三我就是大名鼎鼎的一的處理循環(huán)在中一個(gè)需要負(fù)責(zé)兩個(gè)工作第一個(gè)是作為線程負(fù)責(zé)相應(yīng)的操作第二個(gè)是作為任務(wù)線程執(zhí)行中的任務(wù)接下來我們先從操縱方面入手看一下數(shù)據(jù)是如何從傳遞到我們的中的是模型的一個(gè)實(shí)現(xiàn)并且是基于的那么從的前生今世之四

接上篇Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(一)

Netty 的 IO 處理循環(huán)

在 Netty 中, 一個(gè) EventLoop 需要負(fù)責(zé)兩個(gè)工作, 第一個(gè)是作為 IO 線程, 負(fù)責(zé)相應(yīng)的 IO 操作; 第二個(gè)是作為任務(wù)線程, 執(zhí)行 taskQueue 中的任務(wù). 接下來我們先從 IO 操縱方面入手, 看一下 TCP 數(shù)據(jù)是如何從 Java NIO Socket 傳遞到我們的 handler 中的.

Netty 是 Reactor 模型的一個(gè)實(shí)現(xiàn), 并且是基于 Java NIO 的, 那么從 Java NIO 的前生今世 之四 NIO Selector 詳解 中我們知道, Netty 中必然有一個(gè) Selector 線程, 用于不斷調(diào)用 Java NIO 的 Selector.select 方法, 查詢當(dāng)前是否有就緒的 IO 事件. 回顧一下在 Java NIO 中所講述的 Selector 的使用流程:

通過 Selector.open() 打開一個(gè) Selector.

將 Channel 注冊(cè)到 Selector 中, 并設(shè)置需要監(jiān)聽的事件(interest set)

不斷重復(fù):

調(diào)用 select() 方法

調(diào)用 selector.selectedKeys() 獲取 selected keys

迭代每個(gè) selected key:

1) 從 selected key 中獲取 對(duì)應(yīng)的 Channel 和附加信息(如果有的話)

2) 判斷是哪些 IO 事件已經(jīng)就緒了, 然后處理它們. 如果是 OP_ACCEPT 事件, 則調(diào)用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 并將它設(shè)置為 非阻塞的, 然后將這個(gè) Channel 注冊(cè)到 Selector 中.

3) 根據(jù)需要更改 selected key 的監(jiān)聽事件.

4) 將已經(jīng)處理過的 key 從 selected keys 集合中刪除.

上面的使用流程用代碼來體現(xiàn)就是:

/**
 * @author xiongyongshun
 * @Email yongshun1228@gmail.com
 * @version 1.0
 * @created 16/8/1 13:13
 */
public class NioEchoServer {
    private static final int BUF_SIZE = 256;
    private static final int TIMEOUT = 3000;

    public static void main(String args[]) throws Exception {
        // 打開服務(wù)端 Socket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 打開 Selector
        Selector selector = Selector.open();

        // 服務(wù)端 Socket 監(jiān)聽8080端口, 并配置為非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        // 將 channel 注冊(cè)到 selector 中.
        // 通常我們都是先注冊(cè)一個(gè) OP_ACCEPT 事件, 然后在 OP_ACCEPT 到來時(shí), 再將這個(gè) Channel 的 OP_READ
        // 注冊(cè)到 Selector 中.
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 通過調(diào)用 select 方法, 阻塞地等待 channel I/O 可操作
            if (selector.select(TIMEOUT) == 0) {
                System.out.print(".");
                continue;
            }

            // 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經(jīng)就緒.
            Iterator keyIterator = selector.selectedKeys().iterator();

            while (keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

                // 當(dāng)獲取一個(gè) SelectionKey 后, 就要將它刪除, 表示我們已經(jīng)對(duì)這個(gè) IO 事件進(jìn)行了處理.
                keyIterator.remove();

                if (key.isAcceptable()) {
                    // 當(dāng) OP_ACCEPT 事件到來時(shí), 我們就有從 ServerSocketChannel 中獲取一個(gè) SocketChannel,
                    // 代表客戶端的連接
                    // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
                    // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                    clientChannel.configureBlocking(false);
                    //在 OP_ACCEPT 到來時(shí), 再將這個(gè) Channel 的 OP_READ 注冊(cè)到 Selector 中.
                    // 注意, 這里我們?nèi)绻麤]有設(shè)置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那么 select 方法會(huì)一直直接返回.
                    clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                }

                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    long bytesRead = clientChannel.read(buf);
                    if (bytesRead == -1) {
                        clientChannel.close();
                    } else if (bytesRead > 0) {
                        key.interestOps(OP_READ | SelectionKey.OP_WRITE);
                        System.out.println("Get data length: " + bytesRead);
                    }
                }

                if (key.isValid() && key.isWritable()) {
                    ByteBuffer buf = (ByteBuffer) key.attachment();
                    buf.flip();
                    SocketChannel clientChannel = (SocketChannel) key.channel();

                    clientChannel.write(buf);

                    if (!buf.hasRemaining()) {
                        key.interestOps(OP_READ);
                    }
                    buf.compact();
                }
            }
        }
    }
}

還記得不, 上面操作的第一步 通過 Selector.open() 打開一個(gè) Selector 我們已經(jīng)在第一章的 Channel 實(shí)例化 這一小節(jié)中已經(jīng)提到了, Netty 中是通過調(diào)用 SelectorProvider.openSocketChannel() 來打開一個(gè)新的 Java NIO SocketChannel:

private static SocketChannel newSocket(SelectorProvider provider) {
    ...
    return provider.openSocketChannel();
}

第二步 將 Channel 注冊(cè)到 Selector 中, 并設(shè)置需要監(jiān)聽的事件(interest set) 的操作我們?cè)诘谝徽?channel 的注冊(cè)過程 中也分析過了, 我們?cè)趤砘仡櫼幌? 在客戶端的 Channel 注冊(cè)過程中, 會(huì)有如下調(diào)用鏈:

Bootstrap.initAndRegister -> 
    AbstractBootstrap.initAndRegister -> 
        MultithreadEventLoopGroup.register -> 
            SingleThreadEventLoop.register -> 
                AbstractUnsafe.register ->
                    AbstractUnsafe.register0 ->
                        AbstractNioChannel.doRegister

在 AbstractUnsafe.register 方法中調(diào)用了 register0 方法:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略條件判斷和錯(cuò)誤處理
    AbstractChannel.this.eventLoop = eventLoop;
    register0(promise);
}

register0 方法代碼如下:

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (firstRegistration && isActive()) {
        pipeline.fireChannelActive();
    }
}

register0 又調(diào)用了 AbstractNioChannel.doRegister:

@Override
protected void doRegister() throws Exception {
    // 省略錯(cuò)誤處理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

在這里 javaChannel() 返回的是一個(gè) Java NIO SocketChannel 對(duì)象, 我們將此 SocketChannel 注冊(cè)到前面第一步獲取的 Selector 中.

那么接下來的第三步的循環(huán)是在哪里實(shí)現(xiàn)的呢? 第三步的操作就是我們今天分析的關(guān)鍵, 下面我會(huì)一步一步向讀者展示出來.

thread 的 run 循環(huán)

在 EventLoop 的啟動(dòng) 一小節(jié)中, 我們已經(jīng)了解到了, 當(dāng) EventLoop.execute 第一次被調(diào)用時(shí), 就會(huì)觸發(fā) startThread() 的調(diào)用, 進(jìn)而導(dǎo)致了 EventLoop 所對(duì)應(yīng)的 Java 線程的啟動(dòng). 接著我們來更深入一些, 來看一下此線程啟動(dòng)后都會(huì)做什么東東吧.
下面是此線程的 run() 方法, 我已經(jīng)把一些異常處理和收尾工作的代碼都去掉了. 這個(gè) run 方法可以說是十分簡(jiǎn)單, 主要就是調(diào)用了 SingleThreadEventExecutor.this.run() 方法. 而 SingleThreadEventExecutor.run() 是一個(gè)抽象方法, 它的實(shí)現(xiàn)在 NioEventLoop 中.

thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                ...
            }
        }
    });

繼續(xù)跟蹤到 NioEventLoop.run() 方法, 其源碼如下:

@Override
protected void run() {
    for (;;) {
        boolean oldWakenUp = wakenUp.getAndSet(false);
        try {
            if (hasTasks()) {
                selectNow();
            } else {
                select(oldWakenUp);
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();

                processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
                }
            }
        } catch (Throwable t) {
            ...
        }
    }
}

啊哈, 看到了上面代碼的 for(;;) 所構(gòu)成的死循環(huán)了沒? 原來 NioEventLoop 事件循環(huán)的核心就是這里!
現(xiàn)在我們把上面所提到的 Selector 使用步驟的第三步的部分也找到了.
這個(gè) run 方法可以說是 Netty NIO 的核心, 屬于重中之重, 把它分析明白了, 那么對(duì) Netty 的事件循環(huán)機(jī)制也就了解了大部分了. 讓我們一鼓作氣, 繼續(xù)分析下去吧!

IO 事件的輪詢

首先, 在 run 方法中, 第一步是調(diào)用 hasTasks() 方法來判斷當(dāng)前任務(wù)隊(duì)列中是否有任務(wù):

protected boolean hasTasks() {
    assert inEventLoop();
    return !taskQueue.isEmpty();
}

這個(gè)方法很簡(jiǎn)單, 僅僅是檢查了一下 taskQueue 是否為空. 至于 taskQueue 是什么呢, 其實(shí)它就是存放一系列的需要由此 EventLoop 所執(zhí)行的任務(wù)列表. 關(guān)于 taskQueue, 我們這里暫時(shí)不表, 等到后面再來詳細(xì)分析它.
當(dāng) taskQueue 不為空時(shí), 就執(zhí)行到了 if 分支中的 selectNow() 方法. 然而當(dāng) taskQueue 為空時(shí), 執(zhí)行的是 select(oldWakenUp) 方法. 那么 selectNow()select(oldWakenUp) 之間有什么區(qū)別呢? 來看一下, selectNow() 的源碼如下:

void selectNow() throws IOException {
    try {
        selector.selectNow();
    } finally {
        // restore wakup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

首先調(diào)用了 selector.selectNow() 方法, 這里 selector 是什么大家還有印象不? 我們?cè)诘谝徽?Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (客戶端) 時(shí)對(duì)它有過介紹, 這個(gè) selector 字段正是 Java NIO 中的多路復(fù)用器 Selector. 那么這里 selector.selectNow() 就很好理解了, selectNow() 方法會(huì)檢查當(dāng)前是否有就緒的 IO 事件, 如果有, 則返回就緒 IO 事件的個(gè)數(shù); 如果沒有, 則返回0. 注意, selectNow() 是立即返回的, 不會(huì)阻塞當(dāng)前線程. 當(dāng) selectNow() 調(diào)用后, finally 語句塊中會(huì)檢查 wakenUp 變量是否為 true, 當(dāng)為 true 時(shí), 調(diào)用 selector.wakeup() 喚醒 select() 的阻塞調(diào)用.

看了 if 分支的 selectNow 方法后, 我們?cè)賮砜匆幌?else 分支的 select(oldWakenUp) 方法.
其實(shí) else 分支的 select(oldWakenUp) 方法的處理邏輯比較復(fù)雜, 而我們這里的目的暫時(shí)不是分析這個(gè)方法調(diào)用的具體工作, 因此我這里長話短說, 只列出我們我們關(guān)注的內(nèi)如:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        ...
        int selectedKeys = selector.select(timeoutMillis);
        ...
    } catch (CancelledKeyException e) {
        ...
    }
}

在這個(gè) select 方法中, 調(diào)用了 selector.select(timeoutMillis), 而這個(gè)調(diào)用是會(huì)阻塞住當(dāng)前線程的, timeoutMillis 是阻塞的超時(shí)時(shí)間.
到來這里, 我們可以看到, 當(dāng) hasTasks() 為真時(shí), 調(diào)用的的 selectNow() 方法是不會(huì)阻塞當(dāng)前線程的, 而當(dāng) hasTasks() 為假時(shí), 調(diào)用的 select(oldWakenUp) 是會(huì)阻塞當(dāng)前線程的.
這其實(shí)也很好理解: 當(dāng) taskQueue 中沒有任務(wù)時(shí), 那么 Netty 可以阻塞地等待 IO 就緒事件; 而當(dāng) taskQueue 中有任務(wù)時(shí), 我們自然地希望所提交的任務(wù)可以盡快地執(zhí)行, 因此 Netty 會(huì)調(diào)用非阻塞的 selectNow() 方法, 以保證 taskQueue 中的任務(wù)盡快可以執(zhí)行.

IO 事件的處理

在 NioEventLoop.run() 方法中, 第一步是通過 select/selectNow 調(diào)用查詢當(dāng)前是否有就緒的 IO 事件. 那么當(dāng)有 IO 事件就緒時(shí), 第二步自然就是處理這些 IO 事件啦.
首先讓我們來看一下 NioEventLoop.run 中循環(huán)的剩余部分:

final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
    processSelectedKeys();
    runAllTasks();
} else {
    final long ioStartTime = System.nanoTime();

    processSelectedKeys();

    final long ioTime = System.nanoTime() - ioStartTime;
    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

上面列出的代碼中, 有兩個(gè)關(guān)鍵的調(diào)用, 第一個(gè)是 processSelectedKeys() 調(diào)用, 根據(jù)字面意思, 我們可以猜出這個(gè)方法肯定是查詢就緒的 IO 事件, 然后處理它; 第二個(gè)調(diào)用是 runAllTasks(), 這個(gè)方法我們也可以一眼就看出來它的功能就是運(yùn)行 taskQueue 中的任務(wù).
這里的代碼還有一個(gè)十分有意思的地方, 即 ioRatio. 那什么是 ioRatio呢? 它表示的是此線程分配給 IO 操作所占的時(shí)間比(即運(yùn)行 processSelectedKeys 耗時(shí)在整個(gè)循環(huán)中所占用的時(shí)間). 例如 ioRatio 默認(rèn)是 50, 則表示 IO 操作和執(zhí)行 task 的所占用的線程執(zhí)行時(shí)間比是 1 : 1. 當(dāng)知道了 IO 操作耗時(shí)和它所占用的時(shí)間比, 那么執(zhí)行 task 的時(shí)間就可以很方便的計(jì)算出來了:

設(shè) IO 操作耗時(shí)為 ioTime, ioTime 占的時(shí)間比例為 ioRatio, 則:
    ioTime / ioRatio = taskTime / taskRatio
    taskRatio = 100 - ioRatio
    => taskTime = ioTime * (100 - ioRatio) / ioRatio

根據(jù)上面的公式, 當(dāng)我們?cè)O(shè)置 ioRate = 70 時(shí), 則表示 IO 運(yùn)行耗時(shí)占比為70%, 即假設(shè)某次循環(huán)一共耗時(shí)為 100ms, 那么根據(jù)公式, 我們知道 processSelectedKeys() 方法調(diào)用所耗時(shí)大概為70ms(即 IO 耗時(shí)), 而 runAllTasks() 耗時(shí)大概為 30ms(即執(zhí)行 task 耗時(shí)).
當(dāng) ioRatio 為 100 時(shí), Netty 就不考慮 IO 耗時(shí)的占比, 而是分別調(diào)用 processSelectedKeys()runAllTasks(); 而當(dāng) ioRatio 不為 100時(shí), 則執(zhí)行到 else 分支, 在這個(gè)分支中, 首先記錄下 processSelectedKeys() 所執(zhí)行的時(shí)間(即 IO 操作的耗時(shí)), 然后根據(jù)公式, 計(jì)算出執(zhí)行 task 所占用的時(shí)間, 然后以此為參數(shù), 調(diào)用 runAllTasks().

我們這里先分析一下 processSelectedKeys() 方法調(diào)用, runAllTasks() 我們留到下一節(jié)再分析.
processSelectedKeys() 方法的源碼如下:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

這個(gè)方法中, 會(huì)根據(jù) selectedKeys 字段是否為空, 而分別調(diào)用 processSelectedKeysOptimizedprocessSelectedKeysPlain. selectedKeys 字段是在調(diào)用 openSelector() 方法時(shí), 根據(jù) JVM 平臺(tái)的不同, 而有設(shè)置不同的值, 在我所調(diào)試這個(gè)值是不為 null 的. 其實(shí) processSelectedKeysOptimized 方法 processSelectedKeysPlain 沒有太大的區(qū)別, 為了簡(jiǎn)單起見, 我們以 processSelectedKeysOptimized 為例分析一下源碼的工作流程吧.

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        selectedKeys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask task = (NioTask) a;
            processSelectedKey(k, task);
        }
        ...
    }
}

其實(shí)你別看它代碼挺多的, 但是關(guān)鍵的點(diǎn)就兩個(gè): 迭代 selectedKeys 獲取就緒的 IO 事件, 然后為每個(gè)事件都調(diào)用 processSelectedKey 來處理它.
這里正好完美對(duì)應(yīng)上了我們提到的 Selector 的使用流程中的第三步里操作.
還有一點(diǎn)需要注意的是, 我們可以調(diào)用 selectionKey.attach(object) 給一個(gè) selectionKey 設(shè)置一個(gè)附加的字段, 然后可以通過 Object attachedObj = selectionKey.attachment() 獲取它. 上面代代碼正是通過了 k.attachment() 來獲取一個(gè)附加在 selectionKey 中的對(duì)象, 那么這個(gè)對(duì)象是什么呢? 它又是在哪里設(shè)置的呢? 我們?cè)賮砘貞浺幌?SocketChannel 是如何注冊(cè)到 Selector 中的:
在客戶端的 Channel 注冊(cè)過程中, 會(huì)有如下調(diào)用鏈:

Bootstrap.initAndRegister -> 
    AbstractBootstrap.initAndRegister -> 
        MultithreadEventLoopGroup.register -> 
            SingleThreadEventLoop.register -> 
                AbstractUnsafe.register ->
                    AbstractUnsafe.register0 ->
                        AbstractNioChannel.doRegister

最后的 AbstractNioChannel.doRegister 方法會(huì)調(diào)用 SocketChannel.register 方法注冊(cè)一個(gè) SocketChannel 到指定的 Selector:

@Override
protected void doRegister() throws Exception {
    // 省略錯(cuò)誤處理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

特別注意一下 register 的第三個(gè)參數(shù), 這個(gè)參數(shù)是設(shè)置 selectionKey 的附加對(duì)象的, 和調(diào)用 selectionKey.attach(object) 的效果一樣. 而調(diào)用 register 所傳遞的第三個(gè)參數(shù)是 this, 它其實(shí)就是一個(gè) NioSocketChannel 的實(shí)例. 那么這里就很清楚了, 我們?cè)趯?SocketChannel 注冊(cè)到 Selector 中時(shí), 將 SocketChannel 所對(duì)應(yīng)的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
再回到 processSelectedKeysOptimized 方法中, 當(dāng)我們獲取到附加的對(duì)象后, 我們就調(diào)用 processSelectedKey 來處理這個(gè) IO 事件:

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} else {
    @SuppressWarnings("unchecked")
    NioTask task = (NioTask) a;
    processSelectedKey(k, task);
}

processSelectedKey 方法源碼如下:

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    ...
    try {
        int readyOps = k.readyOps();
        
        // 可讀事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
        
        // 可寫事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        
        // 連接建立事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

這個(gè)代碼是不是很熟悉啊? 完全是 Java NIO 的 Selector 的那一套處理流程嘛!
processSelectedKey 中處理了三個(gè)事件, 分別是:

OP_READ, 可讀事件, 即 Channel 中收到了新數(shù)據(jù)可供上層讀取.

OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數(shù)據(jù).

OP_CONNECT, 連接建立事件, 即 TCP 連接已經(jīng)建立, Channel 處于 active 狀態(tài).

下面我們分別根據(jù)這三個(gè)事件來看一下 Netty 是怎么處理的吧.

OP_READ 處理

當(dāng)就緒的 IO 事件是 OP_READ, 代碼會(huì)調(diào)用 unsafe.read() 方法, 即:

// 可讀事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {
        // Connection already closed - no need to handle write.
        return;
    }
}

unsafe 這個(gè)字段, 我們已經(jīng)和它打了太多的交道了, 在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (客戶端) 中我們已經(jīng)對(duì)它進(jìn)行過濃墨重彩地分析了, 最后我們確定了它是一個(gè) NioSocketChannelUnsafe 實(shí)例, 負(fù)責(zé)的是 Channel 的底層 IO 操作.
我們可以利用 Intellij IDEA 提供的 Go To Implementations 功能, 尋找到這個(gè)方法的實(shí)現(xiàn). 最后我們發(fā)現(xiàn)這個(gè)方法沒有在 NioSocketChannelUnsafe 中實(shí)現(xiàn), 而是在它的父類 AbstractNioByteChannel 實(shí)現(xiàn)的, 它的實(shí)現(xiàn)源碼如下:

@Override
public final void read() {
    ...
    ByteBuf byteBuf = null;
    int messages = 0;
    boolean close = false;
    try {
        int totalReadAmount = 0;
        boolean readPendingReset = false;
        do {
            byteBuf = allocHandle.allocate(allocator);
            int writable = byteBuf.writableBytes();
            int localReadAmount = doReadBytes(byteBuf);

            // 檢查讀取結(jié)果.
            ...

            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;

            ...

            totalReadAmount += localReadAmount;
        
            // 檢查是否是配置了自動(dòng)讀取, 如果不是, 則立即退出循環(huán).
            ...
        } while (++ messages < maxMessagesPerRead);

        pipeline.fireChannelReadComplete();
        allocHandle.record(totalReadAmount);

        if (close) {
            closeOnRead(pipeline);
            close = false;
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close);
    } finally {
    }
}

read() 源碼比較長, 我為了篇幅起見, 刪除了部分代碼, 只留下了主干. 不過我建議讀者朋友們自己一定要看一下 read() 源碼, 這對(duì)理解 Netty 的 EventLoop 十分有幫助.
上面 read 方法其實(shí)歸納起來, 可以認(rèn)為做了如下工作:

分配 ByteBuf

從 SocketChannel 中讀取數(shù)據(jù)

調(diào)用 pipeline.fireChannelRead 發(fā)送一個(gè) inbound 事件.

前面兩點(diǎn)沒什么好說的, 第三點(diǎn) pipeline.fireChannelRead 讀者朋友們看到了有沒有會(huì)心一笑地感覺呢? 反正我看到這里時(shí)是有的. pipeline.fireChannelRead 正好就是我們?cè)诘诙?Netty 源碼分析之 二 貫穿Netty 的大動(dòng)脈 ── ChannelPipeline (二) 中分析的 inbound 事件起點(diǎn). 當(dāng)調(diào)用了 pipeline.fireIN_EVT() 后, 那么就產(chǎn)生了一個(gè) inbound 事件, 此事件會(huì)以 head -> customContext -> tail 的方向依次流經(jīng) ChannelPipeline 中的各個(gè) handler.
調(diào)用了 pipeline.fireChannelRead 后, 就是 ChannelPipeline 中所需要做的工作了, 這些我們已經(jīng)在第二章中有過詳細(xì)討論, 這里就展開了.

OP_WRITE 處理

OP_WRITE 可寫事件代碼如下. 這里代碼比較簡(jiǎn)單, 沒有詳細(xì)分析的必要了.

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
}
OP_CONNECT 處理

最后一個(gè)事件是 OP_CONNECT, 即 TCP 連接已建立事件.

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

OP_CONNECT 事件的處理中, 只做了兩件事情:

正如代碼中的注釋所言, 我們需要將 OP_CONNECT 從就緒事件集中清除, 不然會(huì)一直有 OP_CONNECT 事件.

調(diào)用 unsafe.finishConnect() 通知上層連接已建立

unsafe.finishConnect() 調(diào)用最后會(huì)調(diào)用到 pipeline().fireChannelActive(), 產(chǎn)生一個(gè) inbound 事件, 通知 pipeline 中的各個(gè) handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法會(huì)被調(diào)用)

到了這里, 我們整個(gè) NioEventLoop 的 IO 操作部分已經(jīng)了解完了, 接下來的一節(jié)我們要重點(diǎn)分析一下 Netty 的任務(wù)隊(duì)列機(jī)制.

Netty 的任務(wù)隊(duì)列機(jī)制

我們已經(jīng)提到過, 在Netty 中, 一個(gè) NioEventLoop 通常需要肩負(fù)起兩種任務(wù), 第一個(gè)是作為 IO 線程, 處理 IO 操作; 第二個(gè)就是作為任務(wù)線程, 處理 taskQueue 中的任務(wù). 這一節(jié)的重點(diǎn)就是分析一下 NioEventLoop 的任務(wù)隊(duì)列機(jī)制的.

Task 的添加 普通 Runnable 任務(wù)

NioEventLoop 繼承于 SingleThreadEventExecutor, 而 SingleThreadEventExecutor 中有一個(gè) Queue taskQueue 字段, 用于存放添加的 Task. 在 Netty 中, 每個(gè) Task 都使用一個(gè)實(shí)現(xiàn)了 Runnable 接口的實(shí)例來表示.
例如當(dāng)我們需要將一個(gè) Runnable 添加到 taskQueue 中時(shí), 我們可以進(jìn)行如下操作:

EventLoop eventLoop = channel.eventLoop();
eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("Hello, Netty!");
    }
});

當(dāng)調(diào)用 execute 后, 實(shí)際上是調(diào)用到了 SingleThreadEventExecutor.execute() 方法, 它的實(shí)現(xiàn)如下:

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

而添加任務(wù)的 addTask 方法的源碼如下:

protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (isShutdown()) {
        reject();
    }
    taskQueue.add(task);
}

因此實(shí)際上, taskQueue 是存放著待執(zhí)行的任務(wù)的隊(duì)列.

schedule 任務(wù)

除了通過 execute 添加普通的 Runnable 任務(wù)外, 我們還可以通過調(diào)用 eventLoop.scheduleXXX 之類的方法來添加一個(gè)定時(shí)任務(wù).
EventLoop 中實(shí)現(xiàn)任務(wù)隊(duì)列的功能在超類 SingleThreadEventExecutor 實(shí)現(xiàn)的, 而 schedule 功能的實(shí)現(xiàn)是在 SingleThreadEventExecutor 的父類, 即 AbstractScheduledEventExecutor 中實(shí)現(xiàn)的.
AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:

Queue> scheduledTaskQueue;

scheduledTaskQueue 是一個(gè)隊(duì)列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 我們很容易猜到, 它是對(duì) Schedule 任務(wù)的一個(gè)抽象.
我們來看一下 AbstractScheduledEventExecutor 所實(shí)現(xiàn)的 schedule 方法吧:

@Override
public  ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(unit, "unit");
    if (delay < 0) {
        throw new IllegalArgumentException(
                String.format("delay: %d (expected: >= 0)", delay));
    }
    return schedule(new ScheduledFutureTask(
            this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

這是其中一個(gè)重載的 schedule, 當(dāng)一個(gè) Runnable 傳遞進(jìn)來后, 會(huì)被封裝為一個(gè) ScheduledFutureTask 對(duì)象, 這個(gè)對(duì)象會(huì)記錄下這個(gè) Runnable 在何時(shí)運(yùn)行、已何種頻率運(yùn)行等信息.
當(dāng)構(gòu)建了 ScheduledFutureTask 后, 會(huì)繼續(xù)調(diào)用 另一個(gè)重載的 schedule 方法:

 ScheduledFuture schedule(final ScheduledFutureTask task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else {
        execute(new OneTimeTask() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    return task;
}

在這個(gè)方法中, ScheduledFutureTask 對(duì)象就會(huì)被添加到 scheduledTaskQueue 中了.

任務(wù)的執(zhí)行

當(dāng)一個(gè)任務(wù)被添加到 taskQueue 后, 它是怎么被 EventLoop 執(zhí)行的呢?
讓我們回到 NioEventLoop.run() 方法中, 在這個(gè)方法里, 會(huì)分別調(diào)用 processSelectedKeys()runAllTasks() 方法, 來進(jìn)行 IO 事件的處理和 task 的處理. processSelectedKeys() 方法我們已經(jīng)分析過了, 下面我們來看一下 runAllTasks() 中到底有什么名堂吧.
runAllTasks 方法有兩個(gè)重載的方法, 一個(gè)是無參數(shù)的, 另一個(gè)有一個(gè)參數(shù)的. 首先來看一下無參數(shù)的 runAllTasks:

protected boolean runAllTasks() {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            return true;
        }
    }
}

我們前面已經(jīng)提到過, EventLoop 可以通過調(diào)用 EventLoop.execute 來將一個(gè) Runnable 提交到 taskQueue 中, 也可以通過調(diào)用 EventLoop.schedule 來提交一個(gè) schedule 任務(wù)到 scheduledTaskQueue 中. 在此方法的一開始調(diào)用的 fetchFromScheduledTaskQueue() 其實(shí)就是將 scheduledTaskQueue 中已經(jīng)可以執(zhí)行的(即定時(shí)時(shí)間已到的 schedule 任務(wù)) 拿出來并添加到 taskQueue 中, 作為可執(zhí)行的 task 等待被調(diào)度執(zhí)行.
它的源碼如下:

private void fetchFromScheduledTaskQueue() {
    if (hasScheduledTasks()) {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        for (;;) {
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) {
                break;
            }
            taskQueue.add(scheduledTask);
        }
    }
}

接下來 runAllTasks() 方法就會(huì)不斷調(diào)用 task = pollTask()taskQueue 中獲取一個(gè)可執(zhí)行的 task, 然后調(diào)用它的 run() 方法來運(yùn)行此 task.

注意, 因?yàn)?EventLoop 既需要執(zhí)行 IO 操作, 又需要執(zhí)行 task, 因此我們?cè)谡{(diào)用 EventLoop.execute 方法提交任務(wù)時(shí), 不要提交耗時(shí)任務(wù), 更不能提交一些會(huì)造成阻塞的任務(wù), 不然會(huì)導(dǎo)致我們的 IO 線程得不到調(diào)度, 影響整個(gè)程序的并發(fā)量.

本文由 yongshun 發(fā)表于個(gè)人博客, 采用 署名-相同方式共享 3.0 中國大陸許可協(xié)議.
Email: yongshun1228@gmail .com
本文標(biāo)題為: Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(二)
本文鏈接為: https://segmentfault.com/a/1190000007403937

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/65280.html

相關(guān)文章

  • Netty 源碼分析 就是大名鼎鼎 EventLoop(一)

    摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡(jiǎn)介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...

    livem 評(píng)論0 收藏0
  • 源碼下無秘密 ── 做最好 Netty 源碼分析教程

    摘要:背景在工作中雖然我經(jīng)常使用到庫但是很多時(shí)候?qū)Φ囊恍└拍钸€是處于知其然不知其所以然的狀態(tài)因此就萌生了學(xué)習(xí)源碼的想法剛開始看源碼的時(shí)候自然是比較痛苦的主要原因有兩個(gè)第一網(wǎng)上沒有找到讓我滿意的詳盡的源碼分析的教程第二我也是第一次系統(tǒng)地學(xué)習(xí)這么大代 背景 在工作中, 雖然我經(jīng)常使用到 Netty 庫, 但是很多時(shí)候?qū)?Netty 的一些概念還是處于知其然, 不知其所以然的狀態(tài), 因此就萌生了學(xué)...

    shenhualong 評(píng)論0 收藏0
  • 【自己讀源碼Netty4.X系列() Channel Register

    摘要:我想這很好的解釋了中,僅僅一個(gè)都這么復(fù)雜,在單線程或者說串行的程序中,編程往往是很簡(jiǎn)單的,說白了就是調(diào)用,調(diào)用,調(diào)用然后返回。 Netty源碼分析(三) 前提概要 這次停更很久了,原因是中途迷茫了一段時(shí)間,不過最近調(diào)整過來了。不過有點(diǎn)要說下,前幾天和業(yè)內(nèi)某個(gè)大佬聊天,收獲很多,所以這篇博文和之前也會(huì)不太一樣,我們會(huì)先從如果是我自己去實(shí)現(xiàn)這個(gè)功能需要怎么做開始,然后去看netty源碼,與...

    darkbug 評(píng)論0 收藏0
  • Netty 源碼分析 貫穿Netty 大動(dòng)脈 ── ChannelPipeline (一)

    摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡(jiǎn)介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...

    tunny 評(píng)論0 收藏0
  • 原理剖析(第 011 篇)Netty服務(wù)端啟動(dòng)工作原理分析(下)

    摘要:原理剖析第篇之服務(wù)端啟動(dòng)工作原理分析下一大致介紹由于篇幅過長難以發(fā)布,所以本章節(jié)接著上一節(jié)來的,上一章節(jié)為原理剖析第篇之服務(wù)端啟動(dòng)工作原理分析上那么本章節(jié)就繼續(xù)分析的服務(wù)端啟動(dòng),分析的源碼版本為二三四章節(jié)請(qǐng)看上一章節(jié)詳見原理剖析第篇之 原理剖析(第 011 篇)Netty之服務(wù)端啟動(dòng)工作原理分析(下) - 一、大致介紹 1、由于篇幅過長難以發(fā)布,所以本章節(jié)接著上一節(jié)來的,上一章節(jié)為【原...

    Tikitoo 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<