亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

線程池原理淺析

未東興 / 3502人閱讀

摘要:線程池主要解決兩個(gè)問(wèn)題一是當(dāng)執(zhí)行大量異步任務(wù)時(shí)線程池能夠提供很好的性能。二是線程池提供了一種資源限制和管理的手段,比如可以限制現(xiàn)成的個(gè)數(shù),動(dòng)態(tài)新增線程等。該方法返回一個(gè)對(duì)象,可指定線程池線程數(shù)量。

什么是線程池?

為了避免頻繁重復(fù)的創(chuàng)建和銷毀線程,我們可以讓這些線程進(jìn)行復(fù)用,在線程池中,總會(huì)有活躍的線程在占用,但是線程池中也會(huì)存在沒(méi)有占用的線程,這些線程處于空閑狀態(tài),當(dāng)有任務(wù)的時(shí)候會(huì)從池子里面拿去一個(gè)線程來(lái)進(jìn)行使用,當(dāng)完成工作后,并沒(méi)有銷毀線程,而是將將線程放回到池子中去。

線程池主要解決兩個(gè)問(wèn)題:

一是當(dāng)執(zhí)行大量異步任務(wù)時(shí)線程池能夠提供很好的性能。

二是線程池提供了一種資源限制和管理的手段,比如可以限制現(xiàn)成的個(gè)數(shù),動(dòng)態(tài)新增線程等。

? -《Java并發(fā)編程之美》

上面內(nèi)容出自《Java并發(fā)編程之美》這本書,第一個(gè)問(wèn)題上面已經(jīng)提到過(guò),線程的頻繁創(chuàng)建和銷毀是很損耗性能的,但是線程池中的線程是可以復(fù)用的,可以較好的提升性能問(wèn)題,線程池內(nèi)部是采用了阻塞隊(duì)列來(lái)維護(hù)Runnable對(duì)象。

原理分析

JDK為我們封裝了一套操作多線程的框架Executors,幫助我們可以更好的控制線程池,Executors下提供了一些線程池的工廠方法:

newFixedThreadPool:返回固定長(zhǎng)度的線程池,線程池中的線程數(shù)量是固定的。

newCacheThreadPool:該方法返回一個(gè)根據(jù)實(shí)際情況來(lái)進(jìn)行調(diào)整線程數(shù)量的線程池,空余線程存活時(shí)間是60s

newSingleThreadExecutor:該方法返回一個(gè)只有一個(gè)線程的線程池。

newSingleThreadScheduledExecutor:該方法返回一個(gè)SchemeExecutorService對(duì)象,線程池大小為1,SchemeExecutorService接口在ThreadPoolExecutor類和 ExecutorService接口之上的擴(kuò)展,在給定時(shí)間執(zhí)行某任務(wù)。

newSchemeThreadPool:該方法返回一個(gè)SchemeExecutorService對(duì)象,可指定線程池線程數(shù)量。

對(duì)于核心的線程池來(lái)說(shuō),它內(nèi)部都是使用了ThreadPoolExecutor對(duì)象來(lái)實(shí)現(xiàn)的,只不過(guò)內(nèi)部參數(shù)信息不一樣,我們先來(lái)看兩個(gè)例子:nexFixedThreadPoolnewSingleThreadExecutor如下所示:

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue(),
                                  threadFactory);
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue()));
}

由上面的線程池的創(chuàng)建過(guò)程可以看到它們都是ThreadPoolExecutor的封裝,接下來(lái)我們來(lái)看一下ThreadPoolExecutor的參數(shù)說(shuō)明:

參數(shù)名稱 參數(shù)描述
corePoolSize 指定線程池線程的數(shù)量
maximumPoolSize 指定線程池中線程的最大數(shù)量
keepAliveTime 當(dāng)線程池線程的數(shù)量超過(guò)corePoolSize的時(shí)候,多余的空閑線程存活的時(shí)間,如果超過(guò)了corePoolSize,在keepAliveTime的時(shí)間之后,銷毀線程
unit keepAliveTime的單位
workQueue 工作隊(duì)列,將被提交但尚未執(zhí)行的任務(wù)緩存起來(lái)
threadFactory 線程工廠,用于創(chuàng)建線程,不指定為默認(rèn)線程工廠DefaultThreadFactory
handler 拒絕策略

其中workQueue代表的是提交但未執(zhí)行的隊(duì)列,它是BlockingQueue接口的對(duì)象,用于存放Runable對(duì)象,主要分為以下幾種類型:

直接提交的隊(duì)列:SynchronousQueue隊(duì)列,它是一個(gè)沒(méi)有容量的隊(duì)列,前面我有對(duì)其進(jìn)行講解,當(dāng)線程池進(jìn)行入隊(duì)offer操作的時(shí)候,本身是無(wú)容量的,所以直接返回false,并沒(méi)有保存下來(lái),而是直接提交給線程來(lái)進(jìn)行執(zhí)行,如果沒(méi)有空余的線程則執(zhí)行拒絕策略。

有界的任務(wù)隊(duì)列:可以使用ArrayBlockingQueue隊(duì)列,因?yàn)樗鼉?nèi)部是基于數(shù)組來(lái)進(jìn)行實(shí)現(xiàn)的,初始化時(shí)必須指定容量參數(shù),當(dāng)使用有界任務(wù)隊(duì)列時(shí),當(dāng)有任務(wù)進(jìn)行提交時(shí),線程池的線程數(shù)量小于corePoolSize則創(chuàng)建新的線程來(lái)執(zhí)行任務(wù),當(dāng)線程池的線程數(shù)量大于corePoolSize的時(shí)候,則將提交的任務(wù)放入到隊(duì)列中,當(dāng)提交的任務(wù)塞滿隊(duì)列后,如果線程池的線程數(shù)量沒(méi)有超過(guò)maximumPoolSize,則創(chuàng)建新的線程執(zhí)行任務(wù),如果超過(guò)了maximumPoolSize則執(zhí)行拒絕策略。

無(wú)界的任務(wù)隊(duì)列:可以使用LinkedBlockingQueue隊(duì)列,它內(nèi)部是基于鏈表的形式,默認(rèn)隊(duì)列的長(zhǎng)度是Integer.MAX_VALUE,也可以指定隊(duì)列的長(zhǎng)度,當(dāng)隊(duì)列滿時(shí)進(jìn)行阻塞操作,當(dāng)然線程池中采用的是offer方法并不會(huì)阻塞線程,當(dāng)隊(duì)列滿時(shí)則返回false,入隊(duì)成功則則返回true,當(dāng)使用LinkedBlockingQueue隊(duì)列時(shí),有任務(wù)提交到線程池時(shí),如果線程池的數(shù)量小于corePoolSize,線程池會(huì)產(chǎn)生新的線程來(lái)執(zhí)行任務(wù),當(dāng)線程池的線程數(shù)量大于corePoolSize時(shí),則將提交的任務(wù)放入到隊(duì)列中,等待執(zhí)行任務(wù)的線程執(zhí)行完之后進(jìn)行消費(fèi)隊(duì)列中的任務(wù),若后續(xù)仍有新的任務(wù)提交,而沒(méi)有空閑的線程時(shí),它會(huì)不斷往隊(duì)列中入隊(duì)提交的任務(wù),直到資源耗盡。

優(yōu)先任務(wù)隊(duì)列:t有限任務(wù)隊(duì)列是帶有執(zhí)行優(yōu)先級(jí)的隊(duì)列,他可以使用PriorityBlockingQueue隊(duì)列,可以控制任務(wù)的執(zhí)行先后順序,它是一個(gè)無(wú)界隊(duì)列,該隊(duì)列可以根據(jù)任務(wù)自身的優(yōu)先級(jí)順序先后執(zhí)行,在確保性能的同時(shí),也能有很好的質(zhì)量保證。

上面講解了關(guān)于線程池內(nèi)部都是通過(guò)ThreadPoolExecutor來(lái)進(jìn)行實(shí)現(xiàn)的,那么下面我以一個(gè)例子來(lái)進(jìn)行源碼分析:

public class ThreadPoolDemo1 {

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(5,
                10,
                60L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5), new CustomThreadFactory());
        for (int i = 0; i < 15; i++) {
            executorService.execute(() -> {
                try {
                    Thread.sleep(50000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("由線程:" + Thread.currentThread().getName() + "執(zhí)行任務(wù)完成");
            });
        }
    }
}

上面定義了一個(gè)線程池,線程池初始化的corePoolSize為5,也就是線程池中線程的數(shù)量為5,最大線程maximumThreadPoolSize為10,空余的線程存活的時(shí)間是60s,使用LinkedBlockingQueue來(lái)作為阻塞隊(duì)列,這里還發(fā)現(xiàn)我自定義了ThreadFactory線程池工廠,這里我真是針對(duì)線程創(chuàng)建的時(shí)候輸出線程池的名稱,源碼如下所示:

/**
 * 自定義的線程池構(gòu)造工廠
 */
public class CustomThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public CustomThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                poolNumber.getAndIncrement() +
                "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        String name = namePrefix + threadNumber.getAndIncrement();
        Thread t = new Thread(group, r,
                name,
                0);
        System.out.println("線程池創(chuàng)建,線程名稱為:" + name);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }

}

代碼和DefaultThreadFactory一樣,只是在newThread新建線程的動(dòng)作的時(shí)候輸出了線程池的名稱,方便查看線程創(chuàng)建的時(shí)機(jī),上面main方法中提交了15個(gè)任務(wù),調(diào)用了execute方法來(lái)進(jìn)行提交任務(wù),在分析execute方法之前我們先了解一下線程的狀態(tài):

//假設(shè)Integer類型是32位的二進(jìn)制表示。
//高3位代表線程池的狀態(tài),低29位代表的是線程池的數(shù)量
//默認(rèn)是RUNNING狀態(tài),線程池的數(shù)量為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程個(gè)數(shù)位數(shù),表示的Integer中除去最高的3位之后剩下的位數(shù)表示線程池的個(gè)數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程池的線程的最大數(shù)量
//這里舉例是32為機(jī)器,表示為00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//線程池的狀態(tài)
// runState is stored in the high-order bits
//11100000000000000000000000000000
//接受新任務(wù)并且處理阻塞隊(duì)列里面任務(wù)
private static final int RUNNING    = -1 << COUNT_BITS;
//00000000000000000000000000000000
//拒絕新任務(wù)但是處理阻塞隊(duì)列的任務(wù)
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//00100000000000000000000000000000
//拒接新任務(wù)并且拋棄阻塞隊(duì)列里面的任務(wù),同時(shí)會(huì)中斷正在處理的任務(wù)
private static final int STOP       =  1 << COUNT_BITS;
//01000000000000000000000000000000
//所有任務(wù)都執(zhí)行完(包括阻塞隊(duì)列中的任務(wù))后當(dāng)線程池活動(dòng)線程數(shù)為0,將要調(diào)用terminated方法。
private static final int TIDYING    =  2 << COUNT_BITS;
//01100000000000000000000000000000
//終止?fàn)顟B(tài),terminated方法調(diào)用完成以后的狀態(tài)
private static final int TERMINATED =  3 << COUNT_BITS;

通過(guò)上面內(nèi)容可以看到ctl其實(shí)存放的是線程池的狀態(tài)和線程數(shù)量的變量,默認(rèn)是RUNNING,也就是11100000000000000000000000000000,這里我們來(lái)假設(shè)運(yùn)行的機(jī)器上的Integer的是32位的,因?yàn)橛行C(jī)器上可能Integer并不是32位,下面COUNT_BITS來(lái)控制位數(shù),也就是先獲取Integer在該平臺(tái)上的位數(shù),比如說(shuō)是32位,然后32位-3位=29位,也就是低29位代表的是現(xiàn)成的數(shù)量,高3位代表線程的狀態(tài),可以清晰看到下面的線程池的狀態(tài)都是通過(guò)低位來(lái)進(jìn)行向左位移的操作的,除了上面的變量,還提供了操作線程池狀態(tài)的方法:

// 操作ctl變量,主要是進(jìn)行分解或組合線程數(shù)量和線程池狀態(tài)。
// 獲取高3位,獲取線程池狀態(tài)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 獲取低29位,獲取線程池中線程的數(shù)量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 組合ctl變量,rs=runStatue代表的是線程池的狀態(tài),wc=workCount代表的是線程池線程的數(shù)量
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don"t require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */
//指定的線程池狀態(tài)c小于狀態(tài)s
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
//指定的線程池狀態(tài)c至少是狀態(tài)s
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// 判斷線程池是否運(yùn)行狀態(tài)
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

/**
 * CAS增加線程池線程數(shù)量.
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

/**
 * CAS減少線程池線程數(shù)量
 */
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

/**
 * 將線程池的線程數(shù)量進(jìn)行較少操作,如果競(jìng)爭(zhēng)失敗直到競(jìng)爭(zhēng)成功為止。
 */
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

下來(lái)我們看一下ThreadPoolExecutor對(duì)象下的execute方法:

public void execute(Runnable command) {
      // 判斷提交的任務(wù)是不是為空,如果為空則拋出NullPointException異常
    if (command == null)
        throw new NullPointerException();
      // 獲取線程池的狀態(tài)和線程池的數(shù)量
    int c = ctl.get();
      // 如果線程池的數(shù)量小于corePoolSize,則進(jìn)行添加線程執(zhí)行任務(wù)
    if (workerCountOf(c) < corePoolSize) {
          //添加線程修改線程數(shù)量并且將command作為第一個(gè)任務(wù)進(jìn)行處理
        if (addWorker(command, true))
            return;
          // 獲取最新的狀態(tài)
        c = ctl.get();
    }
      // 如果線程池的狀態(tài)是RUNNING,將命令添加到隊(duì)列中
    if (isRunning(c) && workQueue.offer(command)) {
          //二次檢查線程池狀態(tài)和線程數(shù)量
        int recheck = ctl.get();
          //線程不是RUNNING狀態(tài),從隊(duì)列中移除當(dāng)前任務(wù),并且執(zhí)行拒絕策略。
          //這里說(shuō)明一點(diǎn),只有RUNNING狀態(tài)的線程池才會(huì)接受新的任務(wù),其余狀態(tài)全部拒絕。
        if (! isRunning(recheck) && remove(command))
            reject(command);
          //如果線程池的線程數(shù)量為空時(shí),代表線程池是空的,添加一個(gè)新的線程。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
      //如果隊(duì)列是滿的,或者是SynchronousQueue隊(duì)列時(shí),則直接添加新的線程執(zhí)行任務(wù),如果添加失敗則進(jìn)行拒絕
      //可能線程池的線程數(shù)量大于maximumPoolSize則采取拒絕策略。
    else if (!addWorker(command, false))
        reject(command);
}

通過(guò)分析execute方法總結(jié)以下幾點(diǎn):

當(dāng)線程池中線程的數(shù)量小于corePoolSize時(shí),直接添加線程到線程池并且將當(dāng)前任務(wù)做為第一個(gè)任務(wù)執(zhí)行。

如果線程池的狀態(tài)的是RUNNING,則可以接受任務(wù),將任務(wù)放入到阻塞隊(duì)列中,內(nèi)部進(jìn)行二次檢查,有可能在運(yùn)行下面內(nèi)容時(shí)線程池狀態(tài)已經(jīng)發(fā)生了變化,在這個(gè)時(shí)候如果線程池狀態(tài)變成不是RUNNING,則將當(dāng)前任務(wù)從隊(duì)列中移除,并且進(jìn)行拒絕策略。

如果阻塞隊(duì)列已經(jīng)滿了或者SynchronousQueue這種特殊隊(duì)列無(wú)空間的時(shí)候,直接添加新的線程執(zhí)行任務(wù),當(dāng)線程池的線程數(shù)量大于maximumPoolSize時(shí)相應(yīng)拒絕策略。

入隊(duì)操作用的是offer方法,該方法不會(huì)阻塞隊(duì)列,如果隊(duì)列已經(jīng)滿時(shí)或超時(shí)導(dǎo)致入隊(duì)失敗,返回false,如果入隊(duì)成功返回true。

針對(duì)上面例子源碼我們來(lái)做一下分析,我們?cè)创a中阻塞隊(duì)列采用的是ArrayBlockingQueue隊(duì)列,并且指定隊(duì)列的長(zhǎng)度是5,我們看下面提交的線程池的任務(wù)是15個(gè),而且corePoolSize設(shè)置的是5個(gè)核心線程,最大線程數(shù)(maximumPoolSzie)是10個(gè)(包括核心線程數(shù)),假設(shè)所有任務(wù)都同時(shí)提交到了線程池中,其中有5個(gè)任務(wù)會(huì)被提交到線程中作為第一個(gè)任務(wù)進(jìn)行執(zhí)行,會(huì)有5個(gè)任務(wù)被添加到阻塞隊(duì)列中,還有5個(gè)任務(wù)提交到到線程池中的時(shí)候發(fā)現(xiàn)阻塞隊(duì)列已經(jīng)滿了,這時(shí)候會(huì)直接提交任務(wù),發(fā)現(xiàn)當(dāng)前線程數(shù)是5小于最大線程數(shù),可以進(jìn)行新建線程來(lái)執(zhí)行任務(wù)。

1.png部提交,因?yàn)槲覀冊(cè)谌蝿?wù)中添加了Thread.sleep睡眠一會(huì),在for循環(huán)結(jié)束提交任務(wù)之后可能才會(huì)結(jié)束掉任務(wù)的睡眠執(zhí)行任務(wù)后面內(nèi)容,所以可以看做是全部提交任務(wù),但是沒(méi)有任務(wù)完成,如果有任務(wù)完成的話,可能就不會(huì)是觸發(fā)最大的線程數(shù),有可能就是一個(gè)任務(wù)完成后從隊(duì)列取出來(lái),然后另一個(gè)任務(wù)來(lái)的時(shí)候可以添加到隊(duì)列中,上圖中可以看到,有5個(gè)核心core線程在執(zhí)行任務(wù),任務(wù)隊(duì)列中有5個(gè)任務(wù)在等待空余線程執(zhí)行,而還有5個(gè)正在執(zhí)行的線程,核心線程是指在corePoolSize范圍的線程,而非核心線程指的是大于corePoolSize但是小于等于MaximumPoolSize的線程,就是這些非核心線程并不是一直存活的線程,它會(huì)跟隨線程池指定的參數(shù)來(lái)進(jìn)行銷毀,我們這里指定了60s后如果沒(méi)有任務(wù)提交,則會(huì)進(jìn)行銷毀操作,當(dāng)然工作線程并不指定那些線程必須回收那些線程就必須保留,是根據(jù)從隊(duì)列中獲取任務(wù)來(lái)決定,如果線程獲取任務(wù)時(shí)發(fā)現(xiàn)線程池中的線程數(shù)量大于corePoolSize,并且阻塞隊(duì)列中為空時(shí),則阻塞隊(duì)列會(huì)阻塞60s后如果還有沒(méi)有任務(wù)就返回false,這時(shí)候會(huì)釋放線程,調(diào)用processWorkerExit來(lái)處理線程的退出,接下來(lái)我們來(lái)分析下addWorker都做了什么內(nèi)容:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
          //獲取線程池的狀態(tài)和線程池線程的數(shù)量
        int c = ctl.get();
          //多帶帶獲取線程池的狀態(tài)
        int rs = runStateOf(c);

        //檢查隊(duì)列是否只在必要時(shí)為空
        if (rs >= SHUTDOWN &&                        //線程池的狀態(tài)是SHUTDOWN、STOP、TIDYING、TERMINATED
            ! (rs == SHUTDOWN &&                //可以看做是rs!=SHUTDOWN,線程池狀態(tài)為STOP、TIDYING、TERMINATED
               firstTask == null &&            //可以看做firstTask!=null,并且rs=SHUTDOWN
               ! workQueue.isEmpty()))    //可以看做rs=SHUTDOWN,并且workQueue.isEmpty()隊(duì)列為空
            return false;
                //循環(huán)CAS增加線程池中線程的個(gè)數(shù)
        for (;;) {
              //獲取線程池中線程個(gè)數(shù)
            int wc = workerCountOf(c);
              //如果線程池線程數(shù)量超過(guò)最大線程池?cái)?shù)量,則直接返回
            if (wc >= CAPACITY ||
                //如果指定使用corePoolSize作為限制則使用corePoolSize,反之使用maximumPoolSize,最為工作線程最大線程線程數(shù)量,如果工作線程大于相應(yīng)的線程數(shù)量則直接返回。
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
              //CAS增加線程池中線程的數(shù)量
            if (compareAndIncrementWorkerCount(c))
                  //跳出增加線程池?cái)?shù)量。
                break retry;
              //如果修改失敗,則重新獲取線程池的狀態(tài)和線程數(shù)量
            c = ctl.get();  // Re-read ctl
              //如果最新的線程池狀態(tài)和原有縣城出狀態(tài)不一樣時(shí),則跳轉(zhuǎn)到外層retry中,否則在內(nèi)層循環(huán)重新進(jìn)行CAS
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
        
      //工作線程是否開始啟動(dòng)標(biāo)志
    boolean workerStarted = false;
      //工作線程添加到線程池成功與否標(biāo)志
    boolean workerAdded = false;
    Worker w = null;
    try {
          //創(chuàng)建一個(gè)Worker對(duì)象
        w = new Worker(firstTask);
          //獲取worker中的線程,這里線程是通過(guò)ThreadFactory線程工廠創(chuàng)建出來(lái)的,詳細(xì)看下面源碼信息。
        final Thread t = w.thread;
          //判斷線程是否為空
        if (t != null) {
              //添加獨(dú)占鎖,為添加worker進(jìn)行同步操作,防止其他線程同時(shí)進(jìn)行execute方法。
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //獲取線程池的狀態(tài)
                int rs = runStateOf(ctl.get());
                                //如果線程池狀態(tài)為RUNNING或者是線程池狀態(tài)為SHUTDOWN并且第一個(gè)任務(wù)為空時(shí),當(dāng)線程池狀態(tài)為SHUTDOWN時(shí),是不允許添加新任務(wù)的,所以他會(huì)從隊(duì)列中獲取任務(wù)。
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                      //添加worker到集合中
                    workers.add(w);
                    int s = workers.size();
                      //跟蹤最大的線程池?cái)?shù)量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                      //添加worker成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
              //如果添加worker成功就啟動(dòng)任務(wù)
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
          //如果沒(méi)有啟動(dòng),w不為空就已出worker,并且線程池?cái)?shù)量進(jìn)行減少。
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

通過(guò)上面addWorker方法可以分為兩個(gè)部分來(lái)進(jìn)行講解,第一部分是對(duì)線程池中線程數(shù)量的通過(guò)CAS的方式進(jìn)行增加,其中第一部分中上面有個(gè)if語(yǔ)句,這個(gè)地方著重分析下:

if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
       firstTask == null &&
       ! workQueue.isEmpty()))
    return false;

可以看成下面的樣子,將!放到括號(hào)里面,變成下面的樣子:

if (rs >= SHUTDOWN &&
     (rs != SHUTDOWN ||
       firstTask != null ||
       workQueue.isEmpty()))
    return false;

線程池的狀態(tài)是SHUTDOWN、STOP、TIDYING、TERMINATED

當(dāng)線程池狀態(tài)是STOP、TIDYING、TERMINATED時(shí),這些狀態(tài)的時(shí)候不需要進(jìn)行線程的添加和啟動(dòng)操作,因?yàn)槿绻巧厦娴臓顟B(tài),其實(shí)線程池的線程正在進(jìn)行銷毀操作,意味著線程調(diào)用了shutdownNow等方法。

如果線程池狀態(tài)為SHUTDOWN并且第一個(gè)任務(wù)不為空時(shí),不接受新的任務(wù),直接返回false,也就是說(shuō)SHUTDOWN的狀態(tài),不會(huì)接受新任務(wù),只會(huì)針對(duì)隊(duì)列中未完成的任務(wù)進(jìn)行操作。

當(dāng)線線程池狀態(tài)為SHUTDOWN并且隊(duì)列為空時(shí),直接返回不進(jìn)行任務(wù)添加。

上半部分分為內(nèi)外兩個(gè)循環(huán),外循環(huán)對(duì)線程池狀態(tài)的判斷,用于判斷是否需要添加工作任務(wù)線程,通過(guò)上面講的內(nèi)容進(jìn)行判斷,后面內(nèi)循環(huán)則是通過(guò)CAS操作增加線程數(shù),如果指定了core參數(shù)為true,代表線程池中線程的數(shù)量沒(méi)有超過(guò)corePoolSize,當(dāng)指定為false時(shí),代表線程池中線程數(shù)量達(dá)到了corePoolSize,并且隊(duì)列已經(jīng)滿了,或者是SynchronousQueue這種無(wú)空間的隊(duì)列,但是還沒(méi)有達(dá)到最大的線程池maximumPoolSize,所以它內(nèi)部會(huì)根據(jù)指定的core參數(shù)來(lái)判斷是否已經(jīng)超過(guò)了最大的限制,如果超過(guò)了就不能進(jìn)行添加線程了,并且進(jìn)行拒絕策略,如果沒(méi)有超過(guò)就增加線程數(shù)量。

第二部分主要是把任務(wù)添加到worker中,并啟動(dòng)線程,這里我們先來(lái)看一下Worker對(duì)象。

// 這里發(fā)現(xiàn)它是實(shí)現(xiàn)了AQS,是一個(gè)不可重入的獨(dú)占鎖模式
// 并且它還集成了Runable接口,實(shí)現(xiàn)了run方法。
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    /** 執(zhí)行任務(wù)的線程,通過(guò)ThreadFactory創(chuàng)建 */
    final Thread thread;
    /** 初始化第一個(gè)任務(wù)*/
    Runnable firstTask;
    /** 每個(gè)線程完成任務(wù)的數(shù)量 */
    volatile long completedTasks;

    /**
     * 首先現(xiàn)將state值設(shè)置為-1,因?yàn)樵贏QS中state=0代表的是鎖沒(méi)有被占用,而且在線程池中shutdown方法會(huì)判斷能否爭(zhēng)搶到鎖,如果可以獲得鎖則對(duì)線程進(jìn)行中斷操作,如果調(diào)用了shutdownNow它會(huì)判斷state>=0會(huì)被中斷。
     * firstTask第一個(gè)任務(wù),如果為空則會(huì)從隊(duì)列中獲取任務(wù),后面runWorker中。
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** 委托調(diào)用外部的runWorker方法 */
    public void run() {
        runWorker(this);
    }

        //是否獨(dú)占鎖
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
        
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
        //這里就是上面shutdownNow中調(diào)用的線程中斷的方法,getState()>=0
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

可以看到Worker是一個(gè)實(shí)現(xiàn)了AQS的鎖,它是一個(gè)不可重入的獨(dú)占鎖,并且他也實(shí)現(xiàn)了Runnable接口,實(shí)現(xiàn)了run方法,在構(gòu)造函數(shù)中將AQS的state設(shè)置為-1,為了避免線程還沒(méi)有進(jìn)入runWorker方法前,就調(diào)用了shutdownshutdownNow方法,會(huì)被中斷,設(shè)置為-1則不會(huì)被中斷。后面我們看到run方法,它調(diào)用的是ThreadPoolExecutorrunWorker方法,我們這里回想一下,在addWorker方法中,添加workerHashSet中后,他會(huì)將workerAdded設(shè)置為true,代表添加worker成功,后面有調(diào)用了下面代碼:

if (workerAdded) {
    t.start();
    workerStarted = true;
}

這個(gè)t代表的就是在Worker構(gòu)造函數(shù)中的使用ThreadFactory創(chuàng)建的線程,并且將自己(Worker自己)傳遞了當(dāng)前線程,創(chuàng)建的線程就是任務(wù)線程,任務(wù)線程啟動(dòng)的時(shí)候會(huì)調(diào)用Worker下的run方法,run方法內(nèi)部又委托給外部方法runWorker來(lái)進(jìn)行操作,它的參數(shù)傳遞的是調(diào)用者自己,Worker中的run方法如下所示:

public void run() {
    runWorker(this);             //this指Worker對(duì)象本身
}

這里簡(jiǎn)單畫一張圖來(lái)表示下調(diào)用的邏輯。
2.png

整體的邏輯是先進(jìn)行創(chuàng)建線程,線程將Worker設(shè)置為執(zhí)行程序,并將線程塞到Worker中,然后再addWorker中將Worker中的線程取出來(lái),進(jìn)行啟動(dòng)操作,啟動(dòng)后他會(huì)調(diào)用Worker中的run方法,然后run方法中將調(diào)用ThreadPoolExecutor的runWorker,然后runWorker又會(huì)調(diào)用Worker中的任務(wù)firstTask,這個(gè)fistTask是要真正執(zhí)行的任務(wù),也是用戶自己實(shí)現(xiàn)的代碼邏輯。

接下來(lái)我們就要看一下runWorker方法里面具體內(nèi)容:

final void runWorker(Worker w) {
      //調(diào)用者也就是Worker中的線程
    Thread wt = Thread.currentThread();
      //獲取Worker中的第一個(gè)任務(wù)
    Runnable task = w.firstTask;
      //將Worker中的任務(wù)清除代表執(zhí)行了第一個(gè)任務(wù)了,后面如果再有任務(wù)就從隊(duì)列中獲取。
    w.firstTask = null;
      //這里還記的我們?cè)趎ew Worker的時(shí)候?qū)QS的state狀態(tài)設(shè)置為-1,這里先進(jìn)行解鎖操作,將state設(shè)置為0
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
          //循環(huán)進(jìn)行獲取任務(wù),如果第一個(gè)任務(wù)不為空,或者是如果第一個(gè)任務(wù)為空,從任務(wù)隊(duì)列中獲取任務(wù),如果有任務(wù)則返回獲取的任務(wù)信息,如果沒(méi)有任務(wù)可以獲取則進(jìn)行阻塞,阻塞也分兩種第一種是阻塞直到任務(wù)隊(duì)列中有內(nèi)容,第二種是阻塞隊(duì)列一定時(shí)間之后還是沒(méi)有任務(wù)就直接返回null。
        while (task != null || (task = getTask()) != null) {
              //先獲取worker的獨(dú)占鎖,防止其他線程調(diào)用了shutdown方法。
            w.lock();
            // 如果線程池正在停止,確保線程是被中斷的,如果沒(méi)有則確保線程不被中斷操作。
            if ((runStateAtLeast(ctl.get(), STOP) || //如果線程池狀態(tài)為STOP、TIDYING、TERMINATED直接拒絕任務(wù)中斷當(dāng)前線程
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                  //執(zhí)行任務(wù)之前做一些操作,可進(jìn)行自定義
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                      //運(yùn)行任務(wù)在這里嘍。
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                      //執(zhí)行任務(wù)之后做一些操作,可進(jìn)行自定義
                    afterExecute(task, thrown);
                }
            } finally {
                  //將任務(wù)清空為了下次任務(wù)獲取
                task = null;
                  //統(tǒng)計(jì)當(dāng)前Worker完成了多少任務(wù)
                w.completedTasks++;
                  //獨(dú)占鎖釋放
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
          //處理Worker的退出操作,執(zhí)行清理工作。
        processWorkerExit(w, completedAbruptly);
    }
}

我們看到如果Worker是第一次被啟動(dòng),它會(huì)從Worker中獲取firstTask任務(wù)來(lái)執(zhí)行,然后執(zhí)行成功后,它會(huì)getTask()來(lái)從隊(duì)列中獲取任務(wù),這個(gè)地方比較有意思,它是分情況進(jìn)行獲取任務(wù)的,我們都直到BlockingQueue中提供了幾種從隊(duì)列中獲取的方法,這個(gè)getTask中使用了兩種方式,第一種是使用poll進(jìn)行獲取隊(duì)列中的信息,它采用的是過(guò)一點(diǎn)時(shí)間如果隊(duì)列中仍沒(méi)有任務(wù)時(shí)直接返回null,然后還有一個(gè)就是take方法,take方法是如果隊(duì)列中沒(méi)有任務(wù)則將當(dāng)前線程進(jìn)行阻塞,等待隊(duì)列中有任務(wù)后,會(huì)通知等待的隊(duì)列線程進(jìn)行消費(fèi)任務(wù),讓我們看一下getTask方法:

private Runnable getTask() {
    boolean timedOut = false; //poll獲取超時(shí)

    for (;;) {
          //獲取線程池的狀態(tài)和線程數(shù)量
        int c = ctl.get();
          //獲取線程池的狀態(tài)
        int rs = runStateOf(c);

        //線程池狀態(tài)大于等于SHUTDOWN
          //1.線程池如果是大于STOP的話減少工作線程池?cái)?shù)量
          //2.如果線程池狀態(tài)為SHUTDOW并且隊(duì)列為空時(shí),代表隊(duì)列任務(wù)已經(jīng)執(zhí)行完,返回null,線程數(shù)量減少1
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
                //獲取線程池?cái)?shù)量。
        int wc = workerCountOf(c);

        //如果allowCoreThreadTimeOut為true,則空閑線程在一定時(shí)間未獲得任務(wù)會(huì)清除
          //或者如果線程數(shù)量大于corePoolSize的時(shí)候會(huì)進(jìn)行清除空閑線程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                //1.如果線程池?cái)?shù)量大于最大的線程池?cái)?shù)量或者對(duì)(空余線程進(jìn)行清除操作并且poll超時(shí)了,意思是隊(duì)列中沒(méi)有內(nèi)容了,導(dǎo)致poll間隔一段時(shí)間后沒(méi)有獲取內(nèi)容超時(shí)了。
          //2.如果線程池的數(shù)量大于1或者是隊(duì)列已經(jīng)是空的
          //總之意思就是當(dāng)線程池的線程池?cái)?shù)量大于corePoolSize,或指定了allowCoreThreadTimeOut為true,當(dāng)隊(duì)列中沒(méi)有數(shù)據(jù)或者線程池?cái)?shù)量大于1的情況下,嘗試對(duì)線程池的數(shù)量進(jìn)行減少操作,然后返回null,用于上一個(gè)方法進(jìn)行清除操作。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
              //如果timed代表的是清除空閑線程的意思
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    //等待一段時(shí)間如果沒(méi)有獲取到返回null。
                workQueue.take();                    //阻塞當(dāng)前線程
              //如果隊(duì)列中獲取到內(nèi)容則返回
            if (r != null)
                return r;
                        //如果沒(méi)有獲取到超時(shí)了則設(shè)置timeOut狀態(tài)
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

工作線程調(diào)用getTask從隊(duì)列中進(jìn)行獲取任務(wù)。

如果指定了allowCoreThreadTimeOut或線程池線程數(shù)量大于corePoolSize則進(jìn)行清除空閑多余的線程,調(diào)用阻塞隊(duì)列的poll方法,在指定時(shí)間內(nèi)如果沒(méi)有獲取到任務(wù)直接返回false。

如果線程池中線程池?cái)?shù)量小于corePoolSize或者allowCoreThreadTimeOut為false默認(rèn)值,則進(jìn)行阻塞線程從隊(duì)列中獲取任務(wù),直到隊(duì)列有任務(wù)喚醒線程。

我們還記得第一張圖中有標(biāo)記出來(lái)是core線程和普通線程,其實(shí)這樣標(biāo)記不是很準(zhǔn)確,準(zhǔn)確的意思是如果線程池的數(shù)量超過(guò)了corePoolSize并且沒(méi)有特別指定allowCoreThreadTimeOut的情況下,它會(huì)清除掉大于corePoolSize并且小于等于maximumPoolSize的一些線程,標(biāo)記出core線程的意思是有corePoolSize不會(huì)被清除,但是會(huì)清除大于corePoolSize的線程,也就是線程池中的線程對(duì)獲取任務(wù)的時(shí)候進(jìn)行判斷,也就是getTask中進(jìn)行判斷,如果當(dāng)前線程池的線程數(shù)量大于corePoolSize就使用poll方式獲取隊(duì)列中的任務(wù),當(dāng)過(guò)一段時(shí)間還沒(méi)有任務(wù)就會(huì)返回null,返回null之后設(shè)置timeOut=true,并且獲取getTask也會(huì)返回null,到此會(huì)跳到調(diào)用者runWorker方法中,一直在while (task != null || (task = getTask()) != null)此時(shí)的getTask返回null跳出while循環(huán)語(yǔ)句,設(shè)置completedAbruptly = false,表示不是突然完成的而是正常完成,退出后它會(huì)執(zhí)行finally的processWorkerExit(w, completedAbruptly),執(zhí)行清理工作。我們來(lái)看下源碼:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly)                 // 如果突然完成則調(diào)整線程數(shù)量
        decrementWorkerCount();        // 減少線程數(shù)量1

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();                                                        //獲取鎖,同時(shí)只有一個(gè)線程獲得鎖
    try {
        completedTaskCount += w.completedTasks;    //統(tǒng)計(jì)整個(gè)線程池完成的數(shù)量
        workers.remove(w);                                            //將完成任務(wù)的worker從HashSet中移除
    } finally {
        mainLock.unlock();                                            //釋放鎖
    }
        //嘗試設(shè)置線程池狀態(tài)為TERMINATED
      //1.如果線程池狀態(tài)為SHUTDOWN并且線程池線程數(shù)量與工作隊(duì)列為空時(shí),修改狀態(tài)。
      //2.如果線程池狀態(tài)為STOP并且線程池線程數(shù)量為空時(shí),修改狀態(tài)。
    tryTerminate();                                
        
      // 獲取線程池的狀態(tài)和線程池的數(shù)量
    int c = ctl.get();
      // 如果線程池的狀態(tài)小于STOP,也就是SHUTDOWN或RUNNING狀態(tài)
    if (runStateLessThan(c, STOP)) {
          //如果不是突然完成,也就是正常結(jié)束
        if (!completedAbruptly) {
              //如果指定allowCoreThreadTimeOut=true(默認(rèn)false)則代表線程池中有空余線程時(shí)需要進(jìn)行清理操作,否則線程池中的線程應(yīng)該保持corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
              //這里判斷如果線程池中隊(duì)列為空并且線程數(shù)量最小為0時(shí),將最小值調(diào)整為1,因?yàn)殛?duì)列中還有任務(wù)沒(méi)有完成需要增加隊(duì)列,所以這里增加了一個(gè)線程。
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
          //如果當(dāng)前線程數(shù)效益核心個(gè)數(shù),就增加一個(gè)Worker
        addWorker(null, false);
    }

通過(guò)上面的源碼可以得出,如果線程數(shù)超過(guò)核心線程數(shù)后,在runWorker中就不會(huì)等待隊(duì)列中的消息,而是會(huì)進(jìn)行清除操作,上面的清除代碼首先是先對(duì)線程池的數(shù)量進(jìn)行較少操作,其次是統(tǒng)計(jì)整個(gè)線程池中完成任務(wù)的數(shù)量,然后就是嘗試修改線程池的狀態(tài)由SHUTDOWN->TIDYING->TERMINATED或者是由STOP->TIDYING->TERMINATED,修改線程池狀態(tài)為TERMINATED,需要有兩個(gè)條件:

當(dāng)線程池線程數(shù)量和工作隊(duì)列為空,并且線程池的狀態(tài)為SHUTDOWN時(shí),才會(huì)將狀態(tài)進(jìn)行修改,修改的過(guò)程是SHUTDOWN->TIDYING->TERMINATED

當(dāng)線程池的狀態(tài)為STOP并且線程池?cái)?shù)量為空時(shí),才會(huì)嘗試修改狀態(tài),修改過(guò)程是STOP->TIDYING->TERMINATED

如果設(shè)置為TERMINATED狀態(tài),還需要調(diào)用條件變量terminationsignalAll()方法來(lái)喚醒所有因?yàn)檎{(diào)用awaitTermination方法而被阻塞的線程,換句話說(shuō)當(dāng)調(diào)用awaitTermination后,只有線程池狀態(tài)變成TERMINATED才會(huì)被喚醒。

接下來(lái)我們就來(lái)分析一下這個(gè)tryTerminate方法,看一下他到底符不符合我們上述說(shuō)的內(nèi)容:

final void tryTerminate() {
    for (;;) {
          // 獲取線程池的狀態(tài)和線程池的數(shù)量組合狀態(tài)
        int c = ctl.get();
          //這里多帶帶下面進(jìn)行分析,這里說(shuō)明兩個(gè)問(wèn)題,需要反向來(lái)想這個(gè)問(wèn)題。
          //1.如果線程池狀態(tài)STOP則不進(jìn)入if語(yǔ)句
          //2.如果線程池狀態(tài)為SHUTDOWN并且工作隊(duì)列為空時(shí),不進(jìn)入if語(yǔ)句
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
          //如果線程池?cái)?shù)量不為空時(shí),進(jìn)行中斷操作。
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
              //修改狀態(tài)為TIDYING,并且將線程池的數(shù)量進(jìn)行清空
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                      //執(zhí)行一些邏輯,默認(rèn)是空的
                    terminated();
                } finally {
                      //修改狀態(tài)為TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                      //喚醒調(diào)用awaitTermination方法的線程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }

我們多帶帶將上面的if語(yǔ)句摘出來(lái)進(jìn)行分析,將上面的第一個(gè)if判斷進(jìn)行修改如下,可以看到return在else里面,這時(shí)候內(nèi)部if判斷進(jìn)行轉(zhuǎn)換,轉(zhuǎn)換成如下所示:

if (!isRunning(c) &&            
    !runStateAtLeast(c, TIDYING) && //只能是SHUTDOWN和STOP
    (runStateOf(c) != SHUTDOWN ||  workQueue.isEmpty())){
    //這里執(zhí)行邏輯
}else {
        return;
}

逐一分析分析內(nèi)容如下:

!isRunning(c)代表不是RUNNING,則可能的是SHUTDOWN,STOP,TIDYING,TERMINATED這四種狀態(tài)

中間的連接符是并且的意思,跟著runStateAtLeast(c, TIDYING)這句話的意思是至少是TIDYING,TERMINATED這兩個(gè),反過(guò)來(lái)就是可能是RUNNINGSHUTDOWN,STOP,但是前面已經(jīng)判斷了不能是RUNINNG狀態(tài),所以前面兩個(gè)連在一起就是只能是狀態(tài)為SHUTDOWN,STOP

runStateOf(c) != SHUTDOWN || workQueue.isEmpty()當(dāng)前面的狀態(tài)是SHUTDOWN時(shí),則會(huì)出發(fā)workQueue.isEmpty(),連在一起就是狀態(tài)是SHUTDOWN并工作隊(duì)列為空,當(dāng)線程池狀態(tài)為STOP時(shí),則會(huì)進(jìn)入到runStateOf(c) != SHUTDOWN,直接返回true,就代表線程池狀態(tài)為STOP

后面還有一個(gè)語(yǔ)句一個(gè)if語(yǔ)句將其轉(zhuǎn)換一下邏輯就是下面的內(nèi)容:

if (workerCountOf(c) == 0) { 
         //執(zhí)行下面的邏輯   
}else{
      interruptIdleWorkers(ONLY_ONE);
    return;
}

這里我們也進(jìn)行轉(zhuǎn)換下,就可以看出來(lái)當(dāng)線程池的數(shù)量為空時(shí),才會(huì)進(jìn)行下面的邏輯,下面的邏輯就是修改線程池狀態(tài)為TERMINATED,兩個(gè)連在一起就是上面分析的修改狀態(tài)為TERMINATED的條件,這里畫一張圖來(lái)表示線程池狀態(tài)的信息:

3.png

其實(shí)上面圖中我們介紹了關(guān)于從SHUTDOWNSTOPTERMINATED的變化,沒(méi)有講解關(guān)于如何從RUNNING狀態(tài)轉(zhuǎn)變成SHUTDOWNSTOP狀態(tài),其實(shí)是調(diào)用了shutdown()shutdownNow方法對(duì)其進(jìn)行狀態(tài)的變換,下面來(lái)看一下shutdown方法源碼:

public void shutdown() {
      //獲取全局鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
          //權(quán)限檢查
        checkShutdownAccess();
          //設(shè)置線程池狀態(tài)為SHUTDOWN,如果狀態(tài)已經(jīng)是大于等于SHUTDOWN則直接返回
        advanceRunState(SHUTDOWN);
          //如果線程沒(méi)有設(shè)置中斷標(biāo)識(shí)并且線程沒(méi)有運(yùn)行則設(shè)置中斷標(biāo)識(shí)
        interruptIdleWorkers();
          //空的可以實(shí)現(xiàn)的內(nèi)容
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
      //嘗試修改線程池狀態(tài)為TERMINATED
    tryTerminate();
}

首先對(duì)當(dāng)前線程進(jìn)行權(quán)限檢測(cè),查看是否設(shè)置了安全管理器,如果設(shè)置了則要看當(dāng)前調(diào)用shutdown的線程有沒(méi)有權(quán)限都關(guān)閉線程的權(quán)限,如果有權(quán)限還要看是否有中斷工作現(xiàn)成的權(quán)限,如果沒(méi)有權(quán)限則拋出SecurityExceptionNullPointException異常。

設(shè)置線程池狀態(tài)為SHUTDOWN,如果狀態(tài)已經(jīng)是大于等于SHUTDOWN則直接返回

如果線程沒(méi)有設(shè)置中斷標(biāo)識(shí)并且線程沒(méi)有運(yùn)行則設(shè)置中斷標(biāo)識(shí)

嘗試修改線程池狀態(tài)為TERMINATED

接下來(lái)我們來(lái)看一下advanceRunState內(nèi)容如下所示:

private void advanceRunState(int targetState) {
    for (;;) {
          //獲取線程池狀態(tài)和線程池的線程數(shù)量
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||        //如果線程池的狀態(tài)>=SHUTDOWN
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))    //設(shè)置線程池狀態(tài)為SHUTDOWN
              //返回
            break;                                        
    }
}

當(dāng)線程池的狀態(tài)>=SHUTDOWN,直接返回

如果線程池狀態(tài)為RUNNING,設(shè)置線程池狀態(tài)為SHUTDOWN,設(shè)置成功則返回

interruptIdleWorkers代碼如下所示:

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
      //獲取全局鎖,同時(shí)只能有一個(gè)線程能夠調(diào)用shutdown方法
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
          //遍歷工作線程
        for (Worker w : workers) {
            Thread t = w.thread;
              //如果當(dāng)前線程沒(méi)有設(shè)置中斷標(biāo)志并且可以獲取Worker自己的鎖
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                      //設(shè)置中斷標(biāo)志
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
              //執(zhí)行一次,清理空閑線程。
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

我們看到當(dāng)我們調(diào)用shutdown方法的時(shí)候,只是將空閑的線程給設(shè)置了中斷標(biāo)識(shí),也就是活躍正在執(zhí)行任務(wù)的線程并沒(méi)有設(shè)置中斷標(biāo)識(shí),直到將任務(wù)全部執(zhí)行完后才會(huì)逐步清理線程操作,我們還記的在getTask中的方法里面有這樣一段代碼:

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
}

判斷是否是狀態(tài)>=SHUTDOWN,并且隊(duì)列為空時(shí),將線程池?cái)?shù)量進(jìn)行減少操作,內(nèi)部進(jìn)行CAS操作,直到CAS操作成功為止,并且返回null,返回null后,會(huì)調(diào)用processWorkerExit(w, false);清理Workers線程信息,并且嘗試將線程設(shè)置為TERMINATED狀態(tài),上面是對(duì)所有shutdown方法的分析,下面來(lái)看一下shutdownNow方法并且比較兩個(gè)之間的區(qū)別:

public List shutdownNow() {
    List tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
          //權(quán)限檢查
        checkShutdownAccess();
          //設(shè)置線程池狀態(tài)為STOP,如果狀態(tài)已經(jīng)是大于等于STOP則直接返回
        advanceRunState(STOP);
          //這里是和SHUTDOWN區(qū)別的地方,這里是強(qiáng)制進(jìn)行中斷操作
        interruptWorkers();
          //將為完成任務(wù)復(fù)制到list集合中
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
      //嘗試修改線程池狀態(tài)為TERMINATED
    tryTerminate();
    return tasks;
}

shutdownNow方法返回了未完成的任務(wù)信息列表tasks = drainQueue();,其實(shí)該方法和shutdown方法主要的區(qū)別在于一下幾點(diǎn)內(nèi)容:

shutdownNow方法將線程池狀態(tài)設(shè)置為STOP,而shutdown則將狀態(tài)修改為SHUTDOWN

shutdownNow方法將工作任務(wù)進(jìn)行中斷操作,也就是說(shuō)如果工作線程在工作也會(huì)被中斷,而shutdown則是先嘗試獲取鎖如果獲得鎖成功則進(jìn)行中斷標(biāo)志設(shè)置,也就是中斷操作,如果沒(méi)有獲取到鎖則等待進(jìn)行完成后自動(dòng)退出。

shutdownNow方法返回未完成的任務(wù)列表。

下面代碼是shutDownNowinterruptWorkers方法:

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
              //直接進(jìn)行中斷操作。
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

內(nèi)部調(diào)用了WorkerinterruptIfStarted方法,方法內(nèi)部是針對(duì)線程進(jìn)行中斷操作,但是中斷的前提條件是AQS的state狀態(tài)必須大于等于0,如果狀態(tài)為-1的則不會(huì)被中斷,但是如果任務(wù)運(yùn)行起來(lái)的時(shí)候在runWorker中則不會(huì)執(zhí)行任務(wù),因?yàn)榫€程池狀態(tài)為STOP,如果線程池狀態(tài)為STOP則會(huì)中斷線程,下面代碼是Worker中的interruptIfStarted:

void interruptIfStarted() {
    Thread t;
      //當(dāng)前Worker鎖狀態(tài)大于等于0并且線程沒(méi)有被中斷
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}
拒絕策略

JDK內(nèi)置的拒絕策略如下:

AbortPolicy策略:該策略會(huì)直接拋出異常,阻止系統(tǒng)正常工作

CallerRunsPolicy策略:只要線程池沒(méi)有關(guān)閉線程池狀態(tài)是RUNNING狀態(tài),該略略直接調(diào)用線程中運(yùn)行當(dāng)前被丟棄的任務(wù)

DiscardOledestPolicy策略:該策略將丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的第一個(gè)任務(wù),并嘗試再次提交任務(wù)

DiscardPolicy策略:該策略默默丟棄無(wú)法處理的任務(wù),不予任何處理。

5.png

總結(jié)

首先先上一張圖,針對(duì)這張圖來(lái)進(jìn)行總結(jié):
4.png

主線程進(jìn)行線程池的調(diào)用,線程池執(zhí)行execute方法

線程池通過(guò)addWorker進(jìn)行創(chuàng)建線程,并將線程放入到線程池中,這里我們看到第二步是將線程添加到核心線程中,其實(shí)線程池內(nèi)部不分核心線程和非核心線程,只是根據(jù)corePoolSize和maximumPoolSize設(shè)置的大小來(lái)進(jìn)行區(qū)分,因?yàn)槌^(guò)corePoolSize的線程會(huì)被回收,至于回收那些線程,是根據(jù)線程獲取任務(wù)的時(shí)候進(jìn)行判斷,當(dāng)前線程池?cái)?shù)量大于corePoolSize,或者指定了allowCoreThreadTimeOut為true,則他等待一定時(shí)間后會(huì)返回,不會(huì)一直等待

當(dāng)線程池的數(shù)量達(dá)到corePoolSize時(shí),線程池首先會(huì)將任務(wù)添加到隊(duì)列中

當(dāng)隊(duì)列中任務(wù)也達(dá)到了隊(duì)列設(shè)置的最大值時(shí),它會(huì)創(chuàng)建新的線程,注意的是此時(shí)的線程數(shù)量已經(jīng)超過(guò)了corePoolSize,但是沒(méi)有達(dá)到maximumPoolSize最大值。

當(dāng)線程池的線程數(shù)量達(dá)到了maximumPoolSize,則會(huì)相應(yīng)拒絕策略。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/77783.html

相關(guān)文章

  • 淺析Java線程 ExecutorService

    摘要:創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。 ExecutorService是Java中對(duì)線程池定義的一個(gè)接口,它java.util.concurrent包中. 創(chuàng)建一個(gè)什么樣的ExecutorService的實(shí)例(即線程池)需要g根據(jù)具體應(yīng)用場(chǎng)景而定,不過(guò)Java給我們提供了一個(gè)Executors工廠類,它可以幫助...

    dinfer 評(píng)論0 收藏0
  • Java相關(guān)

    摘要:本文是作者自己對(duì)中線程的狀態(tài)線程間協(xié)作相關(guān)使用的理解與總結(jié),不對(duì)之處,望指出,共勉。當(dāng)中的的數(shù)目而不是已占用的位置數(shù)大于集合番一文通版集合番一文通版垃圾回收機(jī)制講得很透徹,深入淺出。 一小時(shí)搞明白自定義注解 Annotation(注解)就是 Java 提供了一種元程序中的元素關(guān)聯(lián)任何信息和著任何元數(shù)據(jù)(metadata)的途徑和方法。Annotion(注解) 是一個(gè)接口,程序可以通過(guò)...

    wangtdgoodluck 評(píng)論0 收藏0
  • Java深入-框架技巧

    摘要:從使用到原理學(xué)習(xí)線程池關(guān)于線程池的使用,及原理分析分析角度新穎面向切面編程的基本用法基于注解的實(shí)現(xiàn)在軟件開發(fā)中,分散于應(yīng)用中多出的功能被稱為橫切關(guān)注點(diǎn)如事務(wù)安全緩存等。 Java 程序媛手把手教你設(shè)計(jì)模式中的撩妹神技 -- 上篇 遇一人白首,擇一城終老,是多么美好的人生境界,她和他歷經(jīng)風(fēng)雨慢慢變老,回首走過(guò)的點(diǎn)點(diǎn)滴滴,依然清楚的記得當(dāng)初愛(ài)情萌芽的模樣…… Java 進(jìn)階面試問(wèn)題列表 -...

    chengtao1633 評(píng)論0 收藏0
  • java學(xué)習(xí)(四) —— 內(nèi)存分配淺析

    摘要:內(nèi)存分配解析四方法執(zhí)行完畢,立即釋放局部變量所占用的棧空間。內(nèi)存分配解析五調(diào)用對(duì)象的方法,以實(shí)例為參數(shù)。堆和棧的小結(jié)以上就是程序運(yùn)行時(shí)內(nèi)存分配的大致情況。 前言 java中有很多類型的變量、靜態(tài)變量、全局變量及對(duì)象等,這些變量在java運(yùn)行的時(shí)候到底是如何分配內(nèi)存的呢?接下來(lái)有必要對(duì)此進(jìn)行一些探究。 基本知識(shí)概念: (1)寄存器:最快的存儲(chǔ)區(qū), 由編譯器根據(jù)需求進(jìn)行分配,我們?cè)诔绦?..

    henry14 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<