摘要:本文只介紹中線程池的基本使用,不會(huì)過多的涉及到線程池的原理??删彺婢€程的線程池創(chuàng)建一個(gè)可緩存線程的線程池。首先是從接口繼承到的方法使用該方法即將一個(gè)任務(wù)交給線程池去執(zhí)行。方法方法的作用是向線程池發(fā)送關(guān)閉的指令。
首先,我們?yōu)槭裁葱枰€程池?
讓我們先來了解下什么是 對(duì)象池 技術(shù)。某些對(duì)象(比如線程,數(shù)據(jù)庫連接等),它們創(chuàng)建的代價(jià)是非常大的 —— 相比于一般對(duì)象,它們創(chuàng)建消耗的時(shí)間和內(nèi)存都很大(而且這些對(duì)象銷毀的代價(jià)比一般對(duì)象也大)。所以,如果我們維護(hù)一個(gè) 池,每次使用完這些對(duì)象之后,并不銷毀它,而是將其放入池中,下次需要使用時(shí)就直接從池中取出,便可以避免這些對(duì)象的重復(fù)創(chuàng)建;同時(shí),我們可以固定 池的大小,比如設(shè)置池的大小為 N —— 即池中只保留 N 個(gè)這類對(duì)象 —— 當(dāng)池中的 N 個(gè)對(duì)象都在使用中的時(shí)候,為超出數(shù)量的請(qǐng)求設(shè)置一種策略,比如 排隊(duì)等候 或者 直接拒絕請(qǐng)求 等,從而避免頻繁的創(chuàng)建此類對(duì)象。
線程池 即對(duì)象池的一種(池中的對(duì)象為線程 Thread),類似的還有 數(shù)據(jù)庫連接池(池中對(duì)象為數(shù)據(jù)庫連接 Connection)。合理利用線程池能夠帶來三個(gè)好處(參考文末的 References[1]):
降低資源消耗,通過重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀時(shí)造成的時(shí)間和內(nèi)存上的消耗;
提升響應(yīng)速度,當(dāng)任務(wù)到達(dá)時(shí),直接使用線程池中的線程來運(yùn)行任務(wù),使得任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行;
提高線程的可管理性,線程是開銷很大的對(duì)象,如果無限制的創(chuàng)建線程,不僅會(huì)快速消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性;而使用線程池可以對(duì)線程進(jìn)行統(tǒng)一的分配和調(diào)控。
本文只介紹 Java 中線程池的基本使用,不會(huì)過多的涉及到線程池的原理。如果有興趣的讀者需要深入理解線程池的實(shí)現(xiàn)原理,可以參考文末的 References。
JDK 中線程池的基礎(chǔ)架構(gòu)如下:
執(zhí)行器 Executor 是頂級(jí)接口,只包含了一個(gè) execute 方法,用來執(zhí)行一個(gè) Runnable 任務(wù):
執(zhí)行器服務(wù) ExecutorService 接口繼承了 Executor 接口,ExecutorService 是所有線程池的基礎(chǔ)接口,它定義了 JDK 中線程池應(yīng)該實(shí)現(xiàn)的基本方法:
線程池執(zhí)行器 ThreadPoolExecutor 是基礎(chǔ)線程池的核心實(shí)現(xiàn),并且可以通過定制 ThreadPoolExecutor 的構(gòu)造參數(shù)或者繼承 ThreadPoolExecutor,實(shí)現(xiàn)自己的線程池;
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor,是能執(zhí)行周期性任務(wù)或定時(shí)任務(wù)的線程池;
ForkJoinPool 是 JDK1.7 時(shí)添加的類,作為對(duì) Fork/Join 型線程池的實(shí)現(xiàn)。
本文只介紹 ThreadPoolExecutor 線程池的使用,ScheduledThreadPoolExecutor 和 ForkJoinPool 會(huì)在之后的文章中介紹。
查看 ThreadPoolExecutor 的源碼可知,在 ThreadPoolExecutor 的內(nèi)部,將每個(gè)池中的線程包裝為了一個(gè) Worker:
然后在 ThreadPoolExecutor 中定義了一個(gè) HashSet
設(shè)置一個(gè)合適的線程池(即自定義 ThreadPoolExecutor)是比較麻煩的,因此 JDK 通過 Executors 這個(gè)工廠類為我們提供了一些預(yù)先定義好的線程池:
1、固定大小的線程池
創(chuàng)建一個(gè)包含 nThreads 個(gè)工作線程的線程池,這 nThreads 個(gè)線程共享一個(gè)無界隊(duì)列(即不限制大小的隊(duì)列);當(dāng)新任務(wù)提交到線程池時(shí),如果當(dāng)前沒有空閑線程,那么任務(wù)將放入隊(duì)列中進(jìn)行等待,直到有空閑的線程來從隊(duì)列中取出該任務(wù)并運(yùn)行。
(通過 Runtime.getRuntime().availableProcessors() 可以獲得當(dāng)前機(jī)器可用的處理器個(gè)數(shù),對(duì)于計(jì)算密集型的任務(wù),固定大小的線程池的 nThreads 設(shè)置為這個(gè)值時(shí),一般能獲得最大的 CPU 使用率)
2、單線程線程池
創(chuàng)建一個(gè)只包含一個(gè)工作線程的線程池,它的功能可以簡(jiǎn)單的理解為 即 newFixedThreadPool 方法傳入?yún)?shù)為 1 的情況。但是與 newFixedThreadPool(1) 不同的是,如果線程池中這個(gè)唯一的線程意外終止,線程池會(huì)創(chuàng)建一個(gè)新線程繼續(xù)執(zhí)行之后的任務(wù)。
3、可緩存線程的線程池
創(chuàng)建一個(gè)可緩存線程的線程池。當(dāng)新任務(wù)提交到線程池時(shí),如果當(dāng)前線程池中有空閑線程可用,則使用空閑線程來運(yùn)行任務(wù),否則新建一個(gè)線程來運(yùn)行該任務(wù),并將該線程添加到線程池中;而且該線程池會(huì)終止并移除那些超過 60 秒未被使用的空閑線程。所以這個(gè)線程池表現(xiàn)得就像緩存,緩存的資源為線程,緩存的超時(shí)時(shí)間為 60 秒。根據(jù) JDK 的文檔,當(dāng)任務(wù)的運(yùn)行時(shí)間都較短的時(shí)候,該線程池有利于提高性能。
我們看到每個(gè)構(gòu)造線程池的工廠方法都有一個(gè)帶 ThreadFactory 的重載形式。ThreadFactory 即線程池用來新建線程的工廠,每次線程池需要新建一個(gè)線程時(shí),調(diào)用的就是這個(gè) ThreadFactory 的 newThread 方法:
(如果不提供自定義的 ThreadFactory,那么使用的就是 DefaultThreadFactory —— Executors 內(nèi)定義的內(nèi)部類)
比如我們要為線程池中的每個(gè)線程提供一個(gè)特定的名字,那么我們就可以自定義 ThreadFactory 并重寫其 newThread 方法:
public class SimpleThreadFactory implements ThreadFactory { private AtomicInteger id = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Test_Thread-" + id.getAndIncrement()); return thread; } }
通過 JDK 的源碼我們可以知道,以上三種線程池的實(shí)現(xiàn)都是基于 ThreadPoolExecutor:
下面我們來看一下線程池的基礎(chǔ)接口 ExecutorService 中每個(gè)方法的含義。
首先是從 Executor 接口繼承到的 execute 方法:
使用該方法即將一個(gè) Runnable 任務(wù)交給線程池去執(zhí)行。
submit 方法:
submit 方法會(huì)提交一個(gè)任務(wù)去給線程池執(zhí)行,該任務(wù)可以是帶返回結(jié)果的 Callable
向線程池提交一個(gè) Runnable 或者 Callable
將 任務(wù) 作為參數(shù)使用 newTaskFor 方法構(gòu)造出 FutureTask
(由 上一篇文章 可知,FutureTask
![newTaskFor 方法][19]
線程池使用 execute 方法將 FutureTask
然后 Worker 執(zhí)行任務(wù)(即運(yùn)行 run 方法),在任務(wù)完成后,為 Future
通過 Future
invokeAll 方法:
invokeAll 方法可以一次執(zhí)行多個(gè)任務(wù),但它并不同等于多次調(diào)用 submit 方法。submit 方法是非阻塞的,每次調(diào)用 submit 方法提交任務(wù)到線程池之后,會(huì)立即返回與任務(wù)相關(guān)聯(lián)的 Future
而 invokeAll 方法是阻塞的,只有當(dāng)提交的多個(gè)任務(wù)都執(zhí)行完畢之后,invokeAll 方法才會(huì)返回,執(zhí)行結(jié)果會(huì)以List
invokeAny 方法:
invokeAny 方法也是阻塞的,與 invokeAll 方法的不同之處在于,當(dāng)所提交的一組任務(wù)中的任何一個(gè)任務(wù)完成之后,invokeAny 方法便會(huì)返回(返回的結(jié)果便是那個(gè)已經(jīng)完成的任務(wù)的返回值),而其他任務(wù)會(huì)被取消(中斷)。
舉一個(gè) invokeAny 使用的例子:電腦有 C、D、E、F 四個(gè)盤,我們需要找一個(gè)文件,但是我們不知道這個(gè)文件位于哪個(gè)盤中,我們便可以使用 invokeAny,并提交四個(gè)任務(wù)(對(duì)應(yīng)于四個(gè)線程)分別查找 C、D、E、F 四個(gè)盤,如果哪個(gè)線程找到了這個(gè)文件,那么此時(shí) invokeAny 便停止阻塞并返回結(jié)果,同時(shí)取消其他任務(wù)。
shutdown 方法:
shutdown 方法的作用是向線程池發(fā)送關(guān)閉的指令。一旦在線程池上調(diào)用 shutdown 方法之后,線程池便不能再接受新的任務(wù);如果此時(shí)還向線程池提交任務(wù),那么將會(huì)拋出 RejectedExecutionException 異常。之后線程池不會(huì)立刻關(guān)閉,直到之前已經(jīng)提交到線程池中的所有任務(wù)(包括正在運(yùn)行的任務(wù)和在隊(duì)列中等待的任務(wù))都已經(jīng)處理完成,才會(huì)關(guān)閉。
shutdownNow 方法:
與 shutdown 不同,shutdownNow 會(huì)立即關(guān)閉線程池 —— 當(dāng)前在線程池中運(yùn)行的任務(wù)會(huì)全部被取消,然后返回線程池中所有正在等待的任務(wù)。
(值得注意的是,我們 必須顯式的關(guān)閉線程池,否則線程池不會(huì)自己關(guān)閉)
awaitTermination 方法:
awaitTermination 可以用來判斷線程池是否已經(jīng)關(guān)閉。調(diào)用 awaitTermination 之后,在 timeout 時(shí)間內(nèi),如果線程池沒有關(guān)閉,則阻塞當(dāng)前線程,否則返回 true;當(dāng)超過 timeout 的時(shí)間后,若線程池已經(jīng)關(guān)閉則返回 true,否則返回 false。該方法一般這樣使用:
任務(wù)全部提交完畢之后,我們調(diào)用 shutdown 方法向線程池發(fā)送關(guān)閉的指令;
然后我們通過 awaitTermination 來檢測(cè)到線程池是否已經(jīng)關(guān)閉,可以得知線程池中所有的任務(wù)是否已經(jīng)執(zhí)行完畢;
線程池執(zhí)行完已經(jīng)提交的所有任務(wù),并將自己關(guān)閉;
調(diào)用 awaitTermination 方法的線程停止阻塞,并返回 true;
isShutdown() 方法,如果線程池已經(jīng)調(diào)用 shutdown 或者 shutdownNow,則返回 true,否則返回 false;
isTerminated() 方法,如果線程池已經(jīng)調(diào)用 shutdown 并且線程池中所有的任務(wù)已經(jīng)執(zhí)行完畢,或者線程池調(diào)用了 shutdownNow,則返回 true,否則返回 false。
通過以上介紹,我們已經(jīng)了解了 ExecutorService 中所有方法的功能,現(xiàn)在讓我們來實(shí)踐 ExecutorService 的功能。
我們繼續(xù)使用 上一篇文章 的兩個(gè)例子中的任務(wù),首先是任務(wù)類型為 Runnable 的情況:
import java.util.*; import java.util.concurrent.*; public class RunnableTest { public static void main(String[] args) throws Exception { System.out.println("使用線程池運(yùn)行 Runnable 任務(wù):"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 創(chuàng)建大小固定為 5 的線程池 Listtasks = new ArrayList<>(10); for (int i = 0; i < 10; i++) { AccumRunnable task = new AccumRunnable(i * 10 + 1, (i + 1) * 10); tasks.add(task); threadPool.execute(task); // 讓線程池執(zhí)行任務(wù) task } threadPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令,等到已經(jīng)提交的任務(wù)都執(zhí)行完畢之后,線程池會(huì)關(guān)閉 threadPool.awaitTermination(1, TimeUnit.HOURS); // 等待線程池關(guān)閉,等待的最大時(shí)間為 1 小時(shí) int total = 0; for (AccumRunnable task : tasks) { total += task.getResult(); // 調(diào)用在 AccumRunnable 定義的 getResult 方法獲得返回的結(jié)果 } System.out.println("Total: " + total); } static final class AccumRunnable implements Runnable { private final int begin; private final int end; private int result; public AccumRunnable(int begin, int end) { this.begin = begin; this.end = end; } @Override public void run() { result = 0; try { for (int i = begin; i <= end; i++) { result += i; Thread.sleep(100); } } catch (InterruptedException ex) { ex.printStackTrace(System.err); } System.out.printf("(%s) - 運(yùn)行結(jié)束,結(jié)果為 %d ", Thread.currentThread().getName(), result); } public int getResult() { return result; } } }
運(yùn)行結(jié)果:
可以看到 NetBeans 給出的運(yùn)行時(shí)間為 2 秒 —— 因?yàn)槊總€(gè)任務(wù)需要 1 秒的時(shí)間,而線程池中的線程個(gè)數(shù)固定為 5 個(gè),所以線程池最多同時(shí)處理 5 個(gè)任務(wù),因而 10 個(gè)任務(wù)總共需要 2 秒的運(yùn)行時(shí)間。
任務(wù)類型為 Callable:
import java.util.*; import java.util.concurrent.*; public class CallableTest { public static void main(String[] args) throws Exception { System.out.println("使用線程池運(yùn)行 Callable 任務(wù):"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 創(chuàng)建大小固定為 5 的線程池 List> futures = new ArrayList<>(10); for (int i = 0; i < 10; i++) { AccumCallable task = new AccumCallable(i * 10 + 1, (i + 1) * 10); Future future = threadPool.submit(task); // 提交任務(wù) futures.add(future); } threadPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令,等到已經(jīng)提交的任務(wù)都執(zhí)行完畢之后,線程池會(huì)關(guān)閉 int total = 0; for (Future future : futures) { total += future.get(); // 阻塞,直到任務(wù)結(jié)束,返回任務(wù)的結(jié)果 } System.out.println("Total: " + total); } static final class AccumCallable implements Callable { private final int begin; private final int end; public AccumCallable(int begin, int end) { this.begin = begin; this.end = end; } @Override public Integer call() throws Exception { int result = 0; for (int i = begin; i <= end; i++) { result += i; Thread.sleep(100); } System.out.printf("(%s) - 運(yùn)行結(jié)束,結(jié)果為 %d ", Thread.currentThread().getName(), result); return result; } } }
運(yùn)行結(jié)果:
改寫上面的代碼,使用 invokeAll 來直接執(zhí)行一組任務(wù):
public static void main(String[] args) throws Exception { System.out.println("使用線程池 invokeAll 運(yùn)行一組 Callable 任務(wù):"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 創(chuàng)建大小固定為 5 的線程池 Listtasks = new ArrayList<>(10); // tasks 為一組任務(wù) for (int i = 0; i < 10; i++) { tasks.add(new AccumCallable(i * 10 + 1, (i + 1) * 10)); } List > futures = threadPool.invokeAll(tasks); // 阻塞,直到所有任務(wù)都運(yùn)行完畢 int total = 0; for (Future future : futures) { total += future.get(); // 返回任務(wù)的結(jié)果 } System.out.println("Total: " + total); threadPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令 }
運(yùn)行結(jié)果:
線程池是很強(qiáng)大而且很方便的工具,它提供了對(duì)線程進(jìn)行統(tǒng)一的分配和調(diào)控的各種功能。自 JDK1.5 時(shí) JDK 添加了線程池的功能之后,一般情況下更推薦使用線程池來編寫多線程程序,而不是直接使用 Thread。
(invokeAny 也是很實(shí)用的方法,請(qǐng)有興趣的讀者自己實(shí)踐)
References:
http://www.infoq.com/cn/artic...
http://www.cnblogs.com/absfre...
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/66387.html
摘要:一使用線程池的好處線程池提供了一種限制和管理資源包括執(zhí)行一個(gè)任務(wù)。每個(gè)線程池還維護(hù)一些基本統(tǒng)計(jì)信息,例如已完成任務(wù)的數(shù)量。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。使用無界隊(duì)列作為線程池的工作隊(duì)列會(huì)對(duì)線程池帶來的影響與相同。 歷史優(yōu)質(zhì)文章推薦: Java并發(fā)編程指南專欄 分布式系統(tǒng)的經(jīng)典基礎(chǔ)理論 可能是最漂亮的Spring事務(wù)管理詳解 面試中關(guān)于Java虛擬機(jī)(jvm)的問...
摘要:本人郵箱歡迎轉(zhuǎn)載轉(zhuǎn)載請(qǐng)注明網(wǎng)址代碼已經(jīng)全部托管有需要的同學(xué)自行下載引言在之前的例子我們要?jiǎng)?chuàng)建多個(gè)線程處理一批任務(wù)的時(shí)候我是通過創(chuàng)建線程數(shù)組或者使用線程集合來管理的但是這樣做不太好因?yàn)檫@些線程沒有被重復(fù)利用所以這里要引入線程池今天我們就講線程 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github...
摘要:本文主要內(nèi)容為簡(jiǎn)單總結(jié)中線程池的相關(guān)信息。方法簇方法簇用于創(chuàng)建固定線程數(shù)的線程池。三種常見線程池的對(duì)比上文總結(jié)了工具類創(chuàng)建常見線程池的方法,現(xiàn)對(duì)三種線程池區(qū)別進(jìn)行比較。 概述 線程可認(rèn)為是操作系統(tǒng)可調(diào)度的最小的程序執(zhí)行序列,一般作為進(jìn)程的組成部分,同一進(jìn)程中多個(gè)線程可共享該進(jìn)程的資源(如內(nèi)存等)。在單核處理器架構(gòu)下,操作系統(tǒng)一般使用分時(shí)的方式實(shí)現(xiàn)多線程;在多核處理器架構(gòu)下,多個(gè)線程能夠...
摘要:最后,我們會(huì)通過對(duì)源代碼的剖析深入了解線程池的運(yùn)行過程和具體設(shè)計(jì),真正達(dá)到知其然而知其所以然的水平。創(chuàng)建線程池既然線程池是一個(gè)類,那么最直接的使用方法一定是一個(gè)類的對(duì)象,例如。單線程線程池單線程線程 我們一般不會(huì)選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說線程池更棒,...
摘要:用于限定中線程數(shù)的最大值。該線程池中的任務(wù)隊(duì)列維護(hù)著等待執(zhí)行的對(duì)象。線程池和消息隊(duì)列筆者在實(shí)際工程應(yīng)用中,使用過多線程和消息隊(duì)列處理過異步任務(wù)。以上是筆者在學(xué)習(xí)實(shí)踐之后對(duì)于多線程和消息隊(duì)列的粗淺認(rèn)識(shí),初學(xué)者切莫混淆兩者的作用。 多線程編程很難,難點(diǎn)在于多線程代碼的執(zhí)行不是按照我們直覺上的執(zhí)行順序。所以多線程編程必須要建立起一個(gè)宏觀的認(rèn)識(shí)。 線程池是多線程編程中的一個(gè)重要概念。為了能夠更...
閱讀 4110·2023-04-26 02:07
閱讀 3738·2021-10-27 14:14
閱讀 2964·2021-10-14 09:49
閱讀 1683·2019-08-30 15:43
閱讀 2693·2019-08-29 18:33
閱讀 2430·2019-08-29 17:01
閱讀 974·2019-08-29 15:11
閱讀 675·2019-08-29 11:06