亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

原理剖析(第 011 篇)Netty之服務(wù)端啟動(dòng)工作原理分析(下)

Tikitoo / 999人閱讀

摘要:原理剖析第篇之服務(wù)端啟動(dòng)工作原理分析下一大致介紹由于篇幅過(guò)長(zhǎng)難以發(fā)布,所以本章節(jié)接著上一節(jié)來(lái)的,上一章節(jié)為原理剖析第篇之服務(wù)端啟動(dòng)工作原理分析上那么本章節(jié)就繼續(xù)分析的服務(wù)端啟動(dòng),分析的源碼版本為二三四章節(jié)請(qǐng)看上一章節(jié)詳見(jiàn)原理剖析第篇之

原理剖析(第 011 篇)Netty之服務(wù)端啟動(dòng)工作原理分析(下)

-

一、大致介紹
1、由于篇幅過(guò)長(zhǎng)難以發(fā)布,所以本章節(jié)接著上一節(jié)來(lái)的,上一章節(jié)為【原理剖析(第 010 篇)Netty之服務(wù)端啟動(dòng)工作原理分析(上)】;
2、那么本章節(jié)就繼續(xù)分析Netty的服務(wù)端啟動(dòng),分析Netty的源碼版本為:netty-netty-4.1.22.Final;
二、三、四章節(jié)請(qǐng)看上一章節(jié)

詳見(jiàn) 原理剖析(第 010 篇)Netty之服務(wù)端啟動(dòng)工作原理分析(上)

四、源碼分析Netty服務(wù)端啟動(dòng)

上一章節(jié),我們主要分析了一下線程管理組對(duì)象是如何被實(shí)例化的,并且還了解到了每個(gè)線程管理組都有一個(gè)子線程數(shù)組來(lái)處理任務(wù);
那么接下來(lái)我們就直接從4.6開(kāi)始分析了:

4.6、為serverBootstrap添加配置參數(shù)
1、源碼:
    // NettyServer.java
    // 將 Boss、Worker 設(shè)置到 ServerBootstrap 服務(wù)端引導(dǎo)類中
    serverBootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            // 指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel
            .localAddress("localhost", port)//設(shè)置InetSocketAddress讓服務(wù)器監(jiān)聽(tīng)某個(gè)端口已等待客戶端連接。
            .childHandler(new ChannelInitializer() {//設(shè)置childHandler執(zhí)行所有的連接請(qǐng)求
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new PacketHeadDecoder());
                    ch.pipeline().addLast(new PacketBodyDecoder());

                    ch.pipeline().addLast(new PacketHeadEncoder());
                    ch.pipeline().addLast(new PacketBodyEncoder());

                    ch.pipeline().addLast(new PacketHandler());
                }
            });

2、主要為后序的通信設(shè)置了一些配置參數(shù)而已,指定構(gòu)建的Channel為NioServerSocketChannel,說(shuō)明需要啟動(dòng)的是服務(wù)端Netty;
   而后面的服務(wù)端Channel實(shí)例化,就是需要通過(guò)這個(gè)參數(shù)反射實(shí)例化得到;

3、同時(shí)還設(shè)置childHandler,這個(gè)childHandler也是有順序的,服務(wù)端讀數(shù)據(jù)時(shí)執(zhí)行的順序是PacketHeadDecoder、PacketBodyDecoder、PacketHandler;
   而服務(wù)端寫數(shù)據(jù)時(shí)執(zhí)行的順序是PacketHandler、PacketBodyEncoder、PacketHeadEncoder;
   所以在書寫方式大家千萬(wàn)別寫錯(cuò)了,按照本示例代碼的方式書寫即可;
4.7、serverBootstrap調(diào)用bind綁定注冊(cè)
1、源碼:
    // NettyServer.java
    // 最后綁定服務(wù)器等待直到綁定完成,調(diào)用sync()方法會(huì)阻塞直到服務(wù)器完成綁定,然后服務(wù)器等待通道關(guān)閉,因?yàn)槭褂胹ync(),所以關(guān)閉操作也會(huì)被阻塞。
    ChannelFuture channelFuture = serverBootstrap.bind().sync();

2、這里其實(shí)沒(méi)什么好看的,接下來(lái)我們就主要看看這個(gè)bind()方法主要干了些啥,就這么簡(jiǎn)簡(jiǎn)單單一句代碼就把服務(wù)端給啟動(dòng)起來(lái)了,有點(diǎn)神氣了;
4.8、bind()操作
1、源碼:
    // AbstractBootstrap.java
    /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind() {
        validate();
        SocketAddress localAddress = this.localAddress;
        if (localAddress == null) {
            throw new IllegalStateException("localAddress not set");
        }
        return doBind(localAddress); // 創(chuàng)建一個(gè)Channel,并且綁定它
    }

    // AbstractBootstrap.java
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); // 初始化和注冊(cè)

        // 執(zhí)行到此,服務(wù)端大概完成了以下幾件事情:
        // 1、實(shí)例化NioServerSocketChannel,并為Channel配備了pipeline、config、unsafe對(duì)象;
        // 2、將多個(gè)handler添加至pipeline雙向鏈表中,并且等待Channel注冊(cè)成功后需要給每個(gè)handler觸發(fā)添加或者移除事件;
        // 3、將NioServerSocketChannel注冊(cè)到NioEventLoop的多路復(fù)用器上;

        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        // 既然NioServerSocketChannel的Channel綁定到了多路復(fù)用器上,那么接下來(lái)就是綁定地址,綁完地址就可以正式進(jìn)行通信了
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it"s not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

2、大致一看,原來(lái)doBind方法主要干了兩件事情,initAndRegister與doBind0;

3、initAndRegister主要做的事情就是初始化服務(wù)端Channel,并且將服務(wù)端Channel注冊(cè)到bossGroup子線程的多路復(fù)用器上;

4、doBind0則主要完成服務(wù)端啟動(dòng)的最后一步,綁定地址,綁定完后就可以正式進(jìn)行通信了;
4.9、initAndRegister()初始化和注冊(cè)
1、源碼:
    // AbstractBootstrap.java
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 反射調(diào)用clazz.getConstructor().newInstance()實(shí)例化類
            // 同時(shí)也實(shí)例化了Channel,如果是服務(wù)端的話則為NioServerSocketChannel實(shí)例化對(duì)象
            // 在實(shí)例化NioServerSocketChannel的構(gòu)造方法中,也為每個(gè)Channel創(chuàng)建了一個(gè)管道屬性對(duì)象DefaultChannelPipeline=pipeline對(duì)象
            // 在實(shí)例化NioServerSocketChannel的構(gòu)造方法中,也為每個(gè)Channel創(chuàng)建了一個(gè)配置屬性對(duì)象NioServerSocketChannelConfig=config對(duì)象
            // 在實(shí)例化NioServerSocketChannel的構(gòu)造方法中,也為每個(gè)Channel創(chuàng)建了一個(gè)unsafe屬性對(duì)象NioMessageUnsafe=unsafe對(duì)象
            channel = channelFactory.newChannel(); // 調(diào)用ReflectiveChannelFactory的newChannel方法

            // 初始化剛剛被實(shí)例化的channel
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        // config().group()=bossGroup或parentGroup,然后利用parentGroup去注冊(cè)NioServerSocketChannel=channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it"s one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It"s safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop"s task queue for later execution.
        //    i.e. It"s safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

2、逐行分析后會(huì)發(fā)現(xiàn),首先通過(guò)反射實(shí)例化服務(wù)端channel對(duì)象,然后將服務(wù)端channel初始化一下;

3、然后調(diào)用bossGroup的注冊(cè)方法,將服務(wù)端channel作為參數(shù)傳入;

4、至此,方法名也表明該段代碼的意圖,實(shí)例化并初始化服務(wù)端Channel,然后注冊(cè)到bossGroup子線程的多路復(fù)用器上;
4.10、init服務(wù)端Channel
1、源碼:
    // ServerBootstrap.java
    @Override
    void init(Channel channel) throws Exception {
        final Map, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey key = (AttributeKey) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        // 服務(wù)端ServerSocketChannel的管道對(duì)象,Channel實(shí)例化的時(shí)候就被創(chuàng)建出來(lái)了
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry, Object>[] currentChildOptions;
        final Entry, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        ChannelInitializer tempHandler = new ChannelInitializer() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("initAndRegister.init.initChannel-->ch.eventLoop().execute");
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        };

        // 這里我將addLast的參數(shù)剝離出來(lái)了,方便查看閱讀
        p.addLast(tempHandler);
    }

    // DefaultChannelPipeline.java
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }    
    
    // DefaultChannelPipeline.java
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }    
    
    // DefaultChannelPipeline.java
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        // 這里加了synchronized關(guān)鍵字,因此說(shuō)addLast的新增動(dòng)作都是線程安全的
        // 然后再細(xì)看一下其它的方法,只要涉及到的handler的增刪改動(dòng)作的方法,那些方法的代碼塊都是經(jīng)過(guò)synchronized修飾了,保證操作過(guò)程中線程安全
        synchronized (this) {
            // 檢查handler的一些基本信息,若不是被Sharable注解過(guò)的話,而且已經(jīng)被添加到其他pipeline時(shí)則會(huì)拋出異常
            checkMultiplicity(handler);

            // 通過(guò)一系列參數(shù)的封裝,最后封裝成DefaultChannelHandlerContext對(duì)象
            newCtx = newContext(group, filterName(name, handler), handler);

            // 將newCtx添加到倒數(shù)第二的位置,即tail的前面一個(gè)位置
            // 這里的pipeline中的handler的構(gòu)成方式是一個(gè)雙向鏈表式的結(jié)構(gòu)
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // 該addLast方法可能會(huì)被其它各個(gè)地方調(diào)用,但是又為了保證handler的線程安全,則采用了synchronized來(lái)保證addLast的線程安全
            // 在Channel未注冊(cè)到多路復(fù)用器之前,registered肯定為false,那么則把需要添加的handler封裝成AbstractChannelHandlerContext對(duì)象,
            // 然后調(diào)用setAddPending方法,pengding意味著在將來(lái)的某個(gè)時(shí)刻調(diào)用,那到底在什么時(shí)刻被調(diào)用呢?
            // 英文解釋中提到一旦Channel注冊(cè)成功了的話則會(huì)被調(diào)用,所以Channel后續(xù)注冊(cè)完畢,再調(diào)用ChannelHandler.handlerAdded
            if (!registered) {
                newCtx.setAddPending();

                // 將newCtx追加到PendingHandlerCallback單向鏈表的隊(duì)尾,以便將來(lái)回調(diào)時(shí)用到
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }

        // 如果能順利執(zhí)行到這里來(lái)的話,則表明Channel已經(jīng)注冊(cè)到了NioEventLoop的多路復(fù)用器上面了
        // 然后接下來(lái)的就是觸發(fā)調(diào)用newCtx的ChannelHandler.handlerAdded方法
        callHandlerAdded0(newCtx);
        return this;
    }    
    
    // DefaultChannelPipeline.java
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev; // 將目前雙向鏈表tail的前驅(qū)結(jié)點(diǎn)找出來(lái)命名為prev
        newCtx.prev = prev; // 將新的結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)指向prev
        newCtx.next = tail; // 將新的結(jié)點(diǎn)的后驅(qū)結(jié)點(diǎn)指向tail
        prev.next = newCtx; // 將prev的后驅(qū)結(jié)點(diǎn)指向新的結(jié)點(diǎn)
        tail.prev = newCtx; // 將tail的前驅(qū)結(jié)點(diǎn)指向新的結(jié)點(diǎn)

        // 就這樣,將新的結(jié)點(diǎn)通過(guò)一系列的指針指向,順利的將新結(jié)點(diǎn)插到了tail的前面,
        // 也就是鏈表中倒數(shù)第2個(gè)結(jié)點(diǎn)的位置,原鏈表中倒數(shù)第2個(gè)結(jié)點(diǎn)變成倒數(shù)第3個(gè)結(jié)點(diǎn)
    }    
    
    // DefaultChannelPipeline.java
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;

        // 根據(jù)added布爾值封裝成PendingHandlerAddedTask、PendingHandlerRemovedTask對(duì)象
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) { // 首次添加時(shí)則直接賦值然后返回
            pendingHandlerCallbackHead = task;
        } else {
            // 非首次賦值的話,那么通過(guò)while循環(huán)找到隊(duì)尾,然后將隊(duì)尾的next指向賦上task對(duì)象
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next; // 不停的尋找鏈表中的下一個(gè)結(jié)點(diǎn)
            }
            // 當(dāng)pending.next為空說(shuō)明已經(jīng)找到了隊(duì)尾結(jié)點(diǎn),然后將隊(duì)尾的next指向賦上task對(duì)象
            pending.next = task;
        }
    }    
    
2、其實(shí)初始化服務(wù)端Channel也做了蠻多的事情,事情再多也只是p.addLast(tempHandler)這句代碼干的事情多;

3、主要完成了服務(wù)端Channel中管道對(duì)象pipeline添加handler的操作,添加過(guò)程中主要有以下幾點(diǎn):
    ? 添加的過(guò)程中是由synchronized關(guān)鍵字來(lái)保證線程安全的;
    ? 將傳入的handler數(shù)組依次循環(huán)封裝成AbstractChannelHandlerContext對(duì)象添加到管道鎖維護(hù)的handler鏈表中;
    ? 當(dāng)未注冊(cè)成功時(shí)pipeline還維護(hù)了一個(gè)用后后序觸發(fā)調(diào)用newCtx的單向鏈表對(duì)象pendingHandlerCallbackHead;
    ? 當(dāng)注冊(cè)成功后,后序會(huì)迭代pendingHandlerCallbackHead對(duì)象依次執(zhí)行所有任務(wù)的run方法;
    ? 當(dāng)注冊(cè)成功后,還會(huì)觸發(fā)調(diào)用這些newCtx的一些方法,主要是newCtx的ChannelHandler.handlerAdded方法;
    
4、講到這里,initAndRegister總算講了一半了,接下來(lái)我們就要看看被實(shí)例化的服務(wù)端channel是如何注冊(cè)到多路復(fù)用器上的;
4.11、config().group().register(channel)
1、源碼:
    // MultithreadEventLoopGroup.java
    @Override
    public ChannelFuture register(Channel channel) {
        // next()對(duì)象其實(shí)是NioEventLoopGroup內(nèi)部中的children[]屬性中的其中一個(gè),通過(guò)一定規(guī)則挑選一個(gè)NioEventLoop
        // 那么也就是說(shuō)我們最終調(diào)用的是NioEventLoop來(lái)實(shí)現(xiàn)注冊(cè)channel
        return next().register(channel);

        // 從另外一個(gè)層面來(lái)講,我們要想注冊(cè)一個(gè)Channel,那么就可以直接調(diào)用NioEventLoopGroup父類中的register(Channel)即可注冊(cè)Channel,
        // 并且會(huì)按照一定的規(guī)則順序通過(guò)next()挑選一個(gè)NioEventLoop并將Channel綁定到它上面
        // 如果NioEventLoopGroup為bossGroup的話,那么該方法注冊(cè)的肯定是NioServerSocketChannel對(duì)象
        // 如果NioEventLoopGroup為workerGroup的話,那么該方法注冊(cè)的肯定是ServerSocketChannel對(duì)象
    }

    // SingleThreadEventLoop.java
    @Override
    public ChannelFuture register(Channel channel) {
        // 當(dāng)前this對(duì)象是屬于children[]屬性中的其中一個(gè)
        // 將傳入的Channel與當(dāng)前對(duì)象this一起封裝成DefaultChannelPromise對(duì)象
        // 然后再調(diào)用當(dāng)前對(duì)象的register(ChannelPromise)注冊(cè)方法
        return register(new DefaultChannelPromise(channel, this));
    }

    // SingleThreadEventLoop.java
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        // 校驗(yàn)當(dāng)前傳參是否為空,原則上既然是不可能為空的,因?yàn)樯弦粋€(gè)步驟是通過(guò)new出來(lái)的一個(gè)對(duì)象
        ObjectUtil.checkNotNull(promise, "promise");
        // promise.channel()其實(shí)就是上面new DefaultChannelPromise(channel, this)通過(guò)封裝后又取出這個(gè)channel對(duì)象
        // promise.channel().unsafe()而每個(gè)Channel都有一個(gè)unsafe對(duì)象,對(duì)于NioServerSocketChannel來(lái)說(shuō)NioMessageUnsafe=unsafe
        // 當(dāng)前this對(duì)象是屬于children[]屬性中的其中一個(gè)
        promise.channel().unsafe().register(this, promise);
        return promise;
    }    

    // AbstractUnsafe.java
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // eventLoop對(duì)象是屬于children[]屬性中的其中一個(gè)
        // 而當(dāng)前類又是Channel的一個(gè)抽象類AbstractChannel,也是NioServerSocketChannel的父類
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }

        // 這里的 this.eventLoop 就是Children[i]中的一個(gè),也就是具體執(zhí)行任務(wù)的線程封裝對(duì)象
        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) { // 如果對(duì)象eventLoop中的線程對(duì)象和當(dāng)前線程比對(duì)是一樣的話
            register0(promise); // 那么則直接調(diào)用注冊(cè)方法register0
        } else {
            try {
                // 比對(duì)的結(jié)果如果不一樣,十有八九都是該eventLoop的線程還未啟動(dòng),
                // 因此利用eventLoop的execute將register0(promise)方法作為任務(wù)添加到任務(wù)隊(duì)列中,并啟動(dòng)線程來(lái)執(zhí)行任務(wù)
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
                // 而服務(wù)端Channel的注冊(cè),走的是該else分支,因?yàn)榫€程都還沒(méi)創(chuàng)建,eventLoop.inEventLoop()肯定就是false結(jié)果
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }
    
    // SingleThreadEventExecutor.java
    /**
     * 向任務(wù)隊(duì)列中添加任務(wù)task。
     *
     * @param task
     */
    @Override
    public void execute(Runnable task) {
        if (task == null) { // 如果傳入的task任務(wù)為空,則直接拋空指針異常,此方法嚴(yán)格控制傳入?yún)?shù)必須非空
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop(); // 判斷要添加的任務(wù)的這個(gè)線程,是不是和正在運(yùn)行的nioEventLoop的處于同一個(gè)線程?
        if (inEventLoop) { // 如果是,則說(shuō)明就是當(dāng)前線程正在添加task任務(wù),那么則直接調(diào)用addTask方法添加到隊(duì)列中
            addTask(task); // 添加task任務(wù)
        } else {
            startThread(); // 如果不是當(dāng)前線程,則看看實(shí)例化的對(duì)象nioEventLoop父類中state字段是否標(biāo)識(shí)有新建線程,沒(méi)有的話則利用線程池新創(chuàng)建一個(gè)線程,有的話則不用理會(huì)了
            addTask(task); // 添加task任務(wù)
            // 防止意外情況,還需要判斷下是否被關(guān)閉掉,如果被關(guān)閉掉的話,則將剛剛添加的任務(wù)刪除掉并采取拒絕策略直接拋出RejectedExecutionException異常
            if (isShutdown() && removeTask(task)) {
                reject(); // 拒絕策略直接拋出RejectedExecutionException異常
            }
        }

        // addTaskWakesUp:添加任務(wù)時(shí)需要喚醒標(biāo)志,默認(rèn)值為false,通過(guò)構(gòu)造方法傳進(jìn)來(lái)的也是false
        // wakesUpForTask(task):不是NonWakeupRunnable類型的task則返回true,意思就是只要不是NonWakeupRunnable類型的task,都需要喚醒阻塞操作
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    
2、通過(guò)一路跟蹤config().group().register(channel)該方法進(jìn)去,最后會(huì)發(fā)現(xiàn),源碼會(huì)調(diào)用一個(gè)register0(promise)的代碼來(lái)進(jìn)行注冊(cè);

3、但是跳出來(lái)一看,細(xì)細(xì)回味config().group().register(channel)這段代碼,可以得出這樣的一個(gè)結(jié)論:
   若以后大家想注冊(cè)channel的話,直接通過(guò)線程管理組調(diào)用register方法,傳入想要注冊(cè)的channel對(duì)象即可;
   
4、當(dāng)然還有一點(diǎn)請(qǐng)大家留意,execute(Runnable task)可以隨意調(diào)用添加任務(wù),如果線程已啟動(dòng)則直接添加,未啟動(dòng)的話則先啟動(dòng)線程再添加任務(wù);
   
5、那么我們還是先盡快進(jìn)入register0(promise)看看究竟是如何注冊(cè)channel的;
4.12、register0(promise)
1、源碼:
    // AbstractUnsafe.java
    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister(); // 調(diào)用Channel的注冊(cè)方法,讓Channel的子類AbstractNioChannel來(lái)實(shí)現(xiàn)注冊(cè)

            // 執(zhí)行到此,說(shuō)明Channel已經(jīng)注冊(cè)到了多路復(fù)用器上,并且也沒(méi)有拋出什么異常,那么接下來(lái)就賦值變量表明已經(jīng)注冊(cè)成功
            neverRegistered = false;
            registered = true;

            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            pipeline.invokeHandlerAddedIfNeeded(); // 會(huì)回調(diào)initAndRegister中init方法的p.addLast的initChannel回調(diào)

            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (isActive()) { // 檢測(cè)Channel是否處于活躍狀態(tài),這里調(diào)用的是底層的socket的活躍狀態(tài)
                if (firstRegistration) {
                    pipeline.fireChannelActive(); // 這里也是注冊(cè)成功后會(huì)僅僅只會(huì)被調(diào)用一次
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead(); // 設(shè)置Channel的讀事件
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }

    // AbstractNioChannel.java
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) { // 自旋式的死循環(huán),如果正常操作不出現(xiàn)異常的話,那么則會(huì)一直嘗試將Channel注冊(cè)到多路復(fù)用器selector上面
            try {
                // eventLoop()對(duì)象是屬于children[]屬性中的其中一個(gè),children是NioEventLoop類型的對(duì)象
                // 而前面也了解到過(guò),在實(shí)例化每個(gè)children的時(shí)候,會(huì)為每個(gè)children創(chuàng)建一個(gè)多路復(fù)用器selector與unwrappedSelector
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                // 如果將Channel注冊(cè)到了多路復(fù)用器上的成功且沒(méi)有拋什么異常的話,則返回跳出循環(huán)
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }    
    
    // DefaultChannelPipeline.java
    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) { // pipeline標(biāo)識(shí)是否已注冊(cè),默認(rèn)值為true
            firstRegistration = false; // 馬上置位false,告訴大家該方法只會(huì)被調(diào)用一次
            // We are now registered to the EventLoop. It"s time to call the callbacks for the ChannelHandlers,
            // that were added before the registration was done.
            // 到此為止,我們已經(jīng)將Channel注冊(cè)到了NioEventLoop的多路復(fù)用器上,那么接下來(lái)是時(shí)候回調(diào)Handler被添加進(jìn)來(lái)
            callHandlerAddedForAllHandlers();
        }
    }

    // DefaultChannelPipeline.java
    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered; // 測(cè)試registered是否為false,因?yàn)樵摲椒ㄒ呀?jīng)表明只會(huì)被調(diào)用一次,所以這里就嚴(yán)格判斷

            // This Channel itself was registered.
            registered = true; // 而且當(dāng)registered設(shè)置為true后,就不會(huì)再改變?cè)撝档臓顟B(tài)

            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC"ed.
            this.pendingHandlerCallbackHead = null;
        }

        // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
        // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
        // the EventLoop.
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        // 通過(guò)while循環(huán),單向鏈表一個(gè)個(gè)回調(diào)task的execute,該回調(diào)添加的就回調(diào)添加,該回調(diào)移除的則回調(diào)移除
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }

2、看完register0(promise)是不是覺(jué)得,原來(lái)服務(wù)端channel的注冊(cè)是這么簡(jiǎn)單,最后就是調(diào)用javaChannel().register(...)這個(gè)方法一下,然后就這么稀里糊涂的注冊(cè)到多路復(fù)用器上了;

3、在注冊(cè)完之際,還會(huì)找到之前的單向鏈表對(duì)象pendingHandlerCallbackHead,并且依依回調(diào)task.execute方法;

4、然后觸發(fā)fireChannelRegistered注冊(cè)成功事件,告知上層說(shuō)我們的服務(wù)端channel已經(jīng)注冊(cè)成功了,大家請(qǐng)知悉一下;

5、最后通過(guò)beginRead設(shè)置服務(wù)端的讀事件標(biāo)志,就是說(shuō)服務(wù)端的channel僅對(duì)讀事件感興趣;

6、至此initAndRegister這塊算是講完了,那么接下來(lái)就看看最后一個(gè)步驟綁定ip地址,完成通信前的最后一步;
4.13、doBind0(regFuture, channel, localAddress, promise)
1、源碼:
    // AbstractBootstrap.java
    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        // 服務(wù)端啟動(dòng)最后一個(gè)步驟,綁完地址就可以正式進(jìn)行通信了
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    // 服務(wù)端channel直接調(diào)用bind方法進(jìn)行綁定地址
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

    // AbstractChannel.java
    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

    // DefaultChannelPipeline.java
    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

    // AbstractChannelHandlerContext.java
    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

    // AbstractChannelHandlerContext.java
    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }

    // HeadContext.java
    @Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        unsafe.bind(localAddress, promise);
    }

    // AbstractUnsafe.java
    @Override
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();

        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }

        // See: https://github.com/netty/netty/issues/576
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
            localAddress instanceof InetSocketAddress &&
            !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
            !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
            // Warn a user about the fact that a non-root user can"t receive a
            // broadcast packet on *nix if the socket is bound on non-wildcard address.
            logger.warn(
                    "A non-root user can"t receive a broadcast packet if the socket " +
                    "is not bound to a wildcard address; binding to a non-wildcard " +
                    "address (" + localAddress + ") anyway as requested.");
        }

        boolean wasActive = isActive();
        try {
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }

        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }

        safeSetSuccess(promise);
    }

    // NioServerSocketChannel.java
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

2、經(jīng)過(guò)這么一路調(diào)用,其實(shí)最終會(huì)發(fā)現(xiàn),綁定地址也是通過(guò)javaChannel().bind(...)這么簡(jiǎn)短的一句話就搞定了;
   而前面的注冊(cè)到多路復(fù)用器上調(diào)用的是javaChannel().register(...)一句簡(jiǎn)短代碼;
   從而可得出這么一個(gè)結(jié)論:只要關(guān)系到channel的注冊(cè)綁定,最終核心底層都是調(diào)用這個(gè)channel的bind和register方法;

3、至此,服務(wù)端的啟動(dòng)流程算是完結(jié)了。。
五、總結(jié)
    最后我們來(lái)總結(jié)下,通過(guò)分析Netty的服務(wù)端啟動(dòng),經(jīng)過(guò)的流程如下:
    ? 創(chuàng)建兩個(gè)線程管理組,以及實(shí)例化每個(gè)線程管理組的子線程數(shù)組children[];
    ? 設(shè)置啟動(dòng)類參數(shù),比如channel、localAddress、childHandler等參數(shù);
    ? 反射實(shí)例化NioServerSocketChannel,創(chuàng)建ChannelId、unsafe、pipeline等對(duì)象;
    ? 初始化NioServerSocketChannel,設(shè)置attr、option,添加新的handler到服務(wù)端pipeline管道中;
    ? 調(diào)用JDK底層做ServerSocketChannel注冊(cè)到多路復(fù)用器上,并且注冊(cè)成功后回調(diào)pipeline管道中的單向鏈表依次執(zhí)行task任務(wù);
    ? 調(diào)用JDK底層做NioServerSocketChannel綁定端口,并觸發(fā)active事件;
六、下載地址

https://gitee.com/ylimhhmily/SpringCloudTutorial.git

SpringCloudTutorial交流QQ群: 235322432

SpringCloudTutorial交流微信群: 微信溝通群二維碼圖片鏈接

歡迎關(guān)注,您的肯定是對(duì)我最大的支持!!!

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/71082.html

相關(guān)文章

  • 原理剖析 010 Netty服務(wù)端啟動(dòng)工作原理分析(上)

    摘要:端引導(dǎo)類線程管理組線程管理組將設(shè)置到服務(wù)端引導(dǎo)類中指定通道類型為,一種異步模式,阻塞模式為設(shè)置讓服務(wù)器監(jiān)聽(tīng)某個(gè)端口已等待客戶端連接。 原理剖析(第 010 篇)Netty之服務(wù)端啟動(dòng)工作原理分析(上) - 一、大致介紹 1、Netty這個(gè)詞,對(duì)于熟悉并發(fā)的童鞋一點(diǎn)都不陌生,它是一個(gè)異步事件驅(qū)動(dòng)型的網(wǎng)絡(luò)通信框架; 2、使用Netty不需要我們關(guān)注過(guò)多NIO的API操作,簡(jiǎn)簡(jiǎn)單單的使用即可...

    coordinate35 評(píng)論0 收藏0
  • #yyds干貨盤點(diǎn)#學(xué)不懂Netty?看不懂源碼?不存在的,這文章手把手帶你閱讀Netty源碼

    摘要:簡(jiǎn)單來(lái)說(shuō)就是把注冊(cè)的動(dòng)作異步化,當(dāng)異步執(zhí)行結(jié)束后會(huì)把執(zhí)行結(jié)果回填到中抽象類一般就是公共邏輯的處理,而這里的處理主要就是針對(duì)一些參數(shù)的判斷,判斷完了之后再調(diào)用方法。 閱讀這篇文章之前,建議先閱讀和這篇文章關(guān)聯(lián)的內(nèi)容。 1. 詳細(xì)剖析分布式微服務(wù)架構(gòu)下網(wǎng)絡(luò)通信的底層實(shí)現(xiàn)原理(圖解) 2. (年薪60W的技巧)工作了5年,你真的理解Netty以及為什么要用嗎?(深度干貨)...

    zsirfs 評(píng)論0 收藏0
  • 后端經(jīng)驗(yàn)

    摘要:在結(jié)構(gòu)上引入了頭結(jié)點(diǎn)和尾節(jié)點(diǎn),他們分別指向隊(duì)列的頭和尾,嘗試獲取鎖入隊(duì)服務(wù)教程在它提出十多年后的今天,已經(jīng)成為最重要的應(yīng)用技術(shù)之一。隨著編程經(jīng)驗(yàn)的日積月累,越來(lái)越感覺(jué)到了解虛擬機(jī)相關(guān)要領(lǐng)的重要性。 JVM 源碼分析之 Jstat 工具原理完全解讀 http://click.aliyun.com/m/8315/ JVM 源碼分析之 Jstat 工具原理完全解讀 http:...

    i_garfileo 評(píng)論0 收藏0
  • Java面試通關(guān)要點(diǎn)匯總集

    摘要:本文會(huì)以引出問(wèn)題為主,后面有時(shí)間的話,筆者陸續(xù)會(huì)抽些重要的知識(shí)點(diǎn)進(jìn)行詳細(xì)的剖析與解答。敬請(qǐng)關(guān)注服務(wù)端思維微信公眾號(hào),獲取最新文章。 原文地址:梁桂釗的博客博客地址:http://blog.720ui.com 這里,筆者結(jié)合自己過(guò)往的面試經(jīng)驗(yàn),整理了一些核心的知識(shí)清單,幫助讀者更好地回顧與復(fù)習(xí) Java 服務(wù)端核心技術(shù)。本文會(huì)以引出問(wèn)題為主,后面有時(shí)間的話,筆者陸續(xù)會(huì)抽些重要的知識(shí)點(diǎn)進(jìn)...

    gougoujiang 評(píng)論0 收藏0
  • 從小白程序員一路晉升為大廠高級(jí)技術(shù)專家我看過(guò)哪些書籍?(建議收藏)

    摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報(bào)率高。馬上就十一國(guó)慶假期了,給小伙伴們分享下,從小白程序員到大廠高級(jí)技術(shù)專家我看過(guò)哪些技術(shù)類書籍。 大家好,我是...

    sf_wangchong 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

Tikitoo

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<