摘要:有三種狀態(tài)運(yùn)行關(guān)閉終止。類類,提供了一系列工廠方法用于創(chuàng)建線程池,返回的線程池都實(shí)現(xiàn)了接口。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,在提交新任務(wù),任務(wù)將會(huì)進(jìn)入等待隊(duì)列中等待。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。
這是java高并發(fā)系列第19篇文章。
本文主要內(nèi)容介紹Executor框架相關(guān)內(nèi)容
介紹Executor
介紹ExecutorService
介紹線程池ThreadPoolExecutor及案例
介紹定時(shí)器ScheduledExecutorService及案例
介紹Excecutors類的使用
介紹Future接口
介紹Callable接口
介紹FutureTask的使用
獲取異步任務(wù)的執(zhí)行結(jié)果的幾種方法
Executors框架介紹Executors框架是Doug Lea的神作,通過這個(gè)框架,可以很容易的使用線程池高效地處理并行任務(wù)。
Excecutor框架主要包含3部分的內(nèi)容:
任務(wù)相關(guān)的:包含被執(zhí)行的任務(wù)要實(shí)現(xiàn)的接口:Runnable接口或Callable接口
任務(wù)的執(zhí)行相關(guān)的:包含任務(wù)執(zhí)行機(jī)制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架中有兩個(gè)關(guān)鍵的類實(shí)現(xiàn)了ExecutorService接口(ThreadPoolExecutor和ScheduleThreadPoolExecutor)
異步計(jì)算結(jié)果相關(guān)的:包含接口Future和實(shí)現(xiàn)Future接口的FutureTask類
Executors框架包括:
Executor
ExecutorService
ThreadPoolExecutor
Executors
Future
Callable
FutureTask
CompletableFuture
CompletionService
ExecutorCompletionService
下面我們來一個(gè)個(gè)介紹其用途和使用方法。
Executor接口Executor接口中定義了方法execute(Runable able)接口,該方法接受一個(gè)Runable實(shí)例,他來執(zhí)行一個(gè)任務(wù),任務(wù)即實(shí)現(xiàn)一個(gè)Runable接口的類。
ExecutorService接口ExecutorService繼承于Executor接口,他提供了更為豐富的線程實(shí)現(xiàn)方法,比如ExecutorService提供關(guān)閉自己的方法,以及為跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況而生成Future的方法。
ExecutorService有三種狀態(tài):運(yùn)行、關(guān)閉、終止。創(chuàng)建后便進(jìn)入運(yùn)行狀態(tài),當(dāng)調(diào)用了shutdown()方法時(shí),便進(jìn)入了關(guān)閉狀態(tài),此時(shí)意味著ExecutorService不再接受新的任務(wù),但是他還是會(huì)執(zhí)行已經(jīng)提交的任務(wù),當(dāng)所有已經(jīng)提交了的任務(wù)執(zhí)行完后,便達(dá)到終止?fàn)顟B(tài)。如果不調(diào)用shutdown方法,ExecutorService方法會(huì)一直運(yùn)行下去,系統(tǒng)一般不會(huì)主動(dòng)關(guān)閉。
ThreadPoolExecutor類線程池類,實(shí)現(xiàn)了ExecutorService接口中所有方法,該類也是我們經(jīng)常要用到的,非常重要,關(guān)于此類有詳細(xì)的介紹,可以移步:玩轉(zhuǎn)java中的線程池
ScheduleThreadPoolExecutor定時(shí)器ScheduleThreadPoolExecutor繼承自ScheduleThreadPoolExecutor,他主要用來延遲執(zhí)行任務(wù),或者定時(shí)執(zhí)行任務(wù)。功能和Timer類似,但是ScheduleThreadPoolExecutor更強(qiáng)大、更靈活一些。Timer后臺(tái)是單個(gè)線程,而ScheduleThreadPoolExecutor可以在創(chuàng)建的時(shí)候指定多個(gè)線程。
常用方法介紹:
schedule:延遲執(zhí)行任務(wù)1次使用ScheduleThreadPoolExecutor的schedule方法,看一下這個(gè)方法的聲明:
public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit)
3個(gè)參數(shù):command:需要執(zhí)行的任務(wù)
delay:需要延遲的時(shí)間
unit:參數(shù)2的時(shí)間單位,是個(gè)枚舉,可以是天、小時(shí)、分鐘、秒、毫秒、納秒等
示例代碼:
package com.itsoku.chat18; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo1 { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(System.currentTimeMillis()); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.schedule(() -> { System.out.println(System.currentTimeMillis() + "開始執(zhí)行"); //模擬任務(wù)耗時(shí) try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + "執(zhí)行結(jié)束"); }, 2, TimeUnit.SECONDS); } }
輸出:
1564575180457 1564575185525開始執(zhí)行 1564575188530執(zhí)行結(jié)束scheduleAtFixedRate:固定的頻率執(zhí)行任務(wù)
使用ScheduleThreadPoolExecutor的scheduleAtFixedRate方法,該方法設(shè)置了執(zhí)行周期,下一次執(zhí)行時(shí)間相當(dāng)于是上一次的執(zhí)行時(shí)間加上period,任務(wù)每次執(zhí)行完畢之后才會(huì)計(jì)算下次的執(zhí)行時(shí)間。
看一下這個(gè)方法的聲明:
public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
4個(gè)參數(shù):command:表示要執(zhí)行的任務(wù)
initialDelay:表示延遲多久執(zhí)行第一次
period:連續(xù)執(zhí)行之間的時(shí)間間隔
unit:參數(shù)2和參數(shù)3的時(shí)間單位,是個(gè)枚舉,可以是天、小時(shí)、分鐘、秒、毫秒、納秒等
假設(shè)系統(tǒng)調(diào)用scheduleAtFixedRate的時(shí)間是T1,那么執(zhí)行時(shí)間如下:
第1次:T1+initialDelay
第2次:T1+initialDelay+period
第3次:T1+initialDelay+2*period
第n次:T1+initialDelay+(n-1)*period
示例代碼:
package com.itsoku.chat18; import java.sql.Time; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(System.currentTimeMillis()); //任務(wù)執(zhí)行計(jì)數(shù)器 AtomicInteger count = new AtomicInteger(1); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.scheduleAtFixedRate(() -> { int currCount = count.getAndIncrement(); System.out.println(Thread.currentThread().getName()); System.out.println(System.currentTimeMillis() + "第" + currCount + "次" + "開始執(zhí)行"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + "第" + currCount + "次" + "執(zhí)行結(jié)束"); }, 1, 1, TimeUnit.SECONDS); } }
前面6次輸出結(jié)果:
1564576404181 pool-1-thread-1 1564576405247第1次開始執(zhí)行 1564576407251第1次執(zhí)行結(jié)束 pool-1-thread-1 1564576407251第2次開始執(zhí)行 1564576409252第2次執(zhí)行結(jié)束 pool-1-thread-2 1564576409252第3次開始執(zhí)行 1564576411255第3次執(zhí)行結(jié)束 pool-1-thread-1 1564576411256第4次開始執(zhí)行 1564576413260第4次執(zhí)行結(jié)束 pool-1-thread-3 1564576413260第5次開始執(zhí)行 1564576415265第5次執(zhí)行結(jié)束 pool-1-thread-2 1564576415266第6次開始執(zhí)行 1564576417269第6次執(zhí)行結(jié)束
代碼中設(shè)置的任務(wù)第一次執(zhí)行時(shí)間是系統(tǒng)啟動(dòng)之后延遲一秒執(zhí)行。后面每次時(shí)間間隔1秒,從輸出中可以看出系統(tǒng)啟動(dòng)之后過了1秒任務(wù)第一次執(zhí)行(1、3行輸出),輸出的結(jié)果中可以看到任務(wù)第一次執(zhí)行結(jié)束時(shí)間和第二次的結(jié)束時(shí)間一樣,為什么會(huì)這樣?前面有介紹,任務(wù)當(dāng)前執(zhí)行完畢之后會(huì)計(jì)算下次執(zhí)行時(shí)間,下次執(zhí)行時(shí)間為上次執(zhí)行的開始時(shí)間+period,第一次開始執(zhí)行時(shí)間是1564576405247,加1秒為1564576406247,這個(gè)時(shí)間小于第一次結(jié)束的時(shí)間了,說明小于系統(tǒng)當(dāng)前時(shí)間了,會(huì)立即執(zhí)行。
scheduleWithFixedDelay:固定的間隔執(zhí)行任務(wù)使用ScheduleThreadPoolExecutor的scheduleWithFixedDelay方法,該方法設(shè)置了執(zhí)行周期,與scheduleAtFixedRate方法不同的是,下一次執(zhí)行時(shí)間是上一次任務(wù)執(zhí)行完的系統(tǒng)時(shí)間加上period,因而具體執(zhí)行時(shí)間不是固定的,但周期是固定的,是采用相對固定的延遲來執(zhí)行任務(wù)??匆幌逻@個(gè)方法的聲明:
public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
4個(gè)參數(shù):command:表示要執(zhí)行的任務(wù)
initialDelay:表示延遲多久執(zhí)行第一次
period:表示下次執(zhí)行時(shí)間和上次執(zhí)行結(jié)束時(shí)間之間的間隔時(shí)間
unit:參數(shù)2和參數(shù)3的時(shí)間單位,是個(gè)枚舉,可以是天、小時(shí)、分鐘、秒、毫秒、納秒等
假設(shè)系統(tǒng)調(diào)用scheduleAtFixedRate的時(shí)間是T1,那么執(zhí)行時(shí)間如下:
第1次:T1+initialDelay,執(zhí)行結(jié)束時(shí)間:E1
第2次:E1+period,執(zhí)行結(jié)束時(shí)間:E2
第3次:E2+period,執(zhí)行結(jié)束時(shí)間:E3
第4次:E3+period,執(zhí)行結(jié)束時(shí)間:E4
第n次:上次執(zhí)行結(jié)束時(shí)間+period
示例代碼:
package com.itsoku.chat18; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo3 { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(System.currentTimeMillis()); //任務(wù)執(zhí)行計(jì)數(shù)器 AtomicInteger count = new AtomicInteger(1); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.scheduleWithFixedDelay(() -> { int currCount = count.getAndIncrement(); System.out.println(Thread.currentThread().getName()); System.out.println(System.currentTimeMillis() + "第" + currCount + "次" + "開始執(zhí)行"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + "第" + currCount + "次" + "執(zhí)行結(jié)束"); }, 1, 3, TimeUnit.SECONDS); } }
前幾次輸出如下:
1564578510983 pool-1-thread-1 1564578512087第1次開始執(zhí)行 1564578514091第1次執(zhí)行結(jié)束 pool-1-thread-1 1564578517096第2次開始執(zhí)行 1564578519100第2次執(zhí)行結(jié)束 pool-1-thread-2 1564578522103第3次開始執(zhí)行 1564578524105第3次執(zhí)行結(jié)束 pool-1-thread-1 1564578527106第4次開始執(zhí)行 1564578529106第4次執(zhí)行結(jié)束
延遲1秒之后執(zhí)行第1次,后面每次的執(zhí)行時(shí)間和上次執(zhí)行結(jié)束時(shí)間間隔3秒。
scheduleAtFixedRate和scheduleWithFixedDelay示例建議多看2遍。
定時(shí)任務(wù)有異常會(huì)怎么樣?示例代碼:
package com.itsoku.chat18; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo4 { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(System.currentTimeMillis()); //任務(wù)執(zhí)行計(jì)數(shù)器 AtomicInteger count = new AtomicInteger(1); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); ScheduledFuture> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { int currCount = count.getAndIncrement(); System.out.println(Thread.currentThread().getName()); System.out.println(System.currentTimeMillis() + "第" + currCount + "次" + "開始執(zhí)行"); System.out.println(10 / 0); System.out.println(System.currentTimeMillis() + "第" + currCount + "次" + "執(zhí)行結(jié)束"); }, 1, 1, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(5); System.out.println(scheduledFuture.isCancelled()); System.out.println(scheduledFuture.isDone()); } }
系統(tǒng)輸出如下內(nèi)容就再也沒有輸出了:
1564578848143 pool-1-thread-1 1564578849226第1次開始執(zhí)行 false true
先說補(bǔ)充點(diǎn)知識(shí):schedule、scheduleAtFixedRate、scheduleWithFixedDelay這幾個(gè)方法有個(gè)返回值ScheduledFuture,通過ScheduledFuture可以對執(zhí)行的任務(wù)做一些操作,如判斷任務(wù)是否被取消、是否執(zhí)行完成。
再回到上面代碼,任務(wù)中有個(gè)10/0的操作,會(huì)觸發(fā)異常,發(fā)生異常之后沒有任何現(xiàn)象,被ScheduledExecutorService內(nèi)部給吞掉了,然后這個(gè)任務(wù)再也不會(huì)執(zhí)行了,scheduledFuture.isDone()輸出true,表示這個(gè)任務(wù)已經(jīng)結(jié)束了,再也不會(huì)被執(zhí)行了。所以如果程序有異常,開發(fā)者自己注意處理一下,不然跑著跑著發(fā)現(xiàn)任務(wù)怎么不跑了,也沒有異常輸出。
取消定時(shí)任務(wù)的執(zhí)行可能任務(wù)執(zhí)行一會(huì),想取消執(zhí)行,可以調(diào)用ScheduledFuture的cancel方法,參數(shù)表示是否給任務(wù)發(fā)送中斷信號(hào)。
package com.itsoku.chat18; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo5 { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println(System.currentTimeMillis()); //任務(wù)執(zhí)行計(jì)數(shù)器 AtomicInteger count = new AtomicInteger(1); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); ScheduledFuture> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { int currCount = count.getAndIncrement(); System.out.println(Thread.currentThread().getName()); System.out.println(System.currentTimeMillis() + "第" + currCount + "次" + "開始執(zhí)行"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis() + "第" + currCount + "次" + "執(zhí)行結(jié)束"); }, 1, 1, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(5); scheduledFuture.cancel(false); TimeUnit.SECONDS.sleep(1); System.out.println("任務(wù)是否被取消:"+scheduledFuture.isCancelled()); System.out.println("任務(wù)是否已完成:"+scheduledFuture.isDone()); } }
輸出:
1564579843190 pool-1-thread-1 1564579844255第1次開始執(zhí)行 1564579846260第1次執(zhí)行結(jié)束 pool-1-thread-1 1564579847263第2次開始執(zhí)行 任務(wù)是否被取消:true 任務(wù)是否已完成:true 1564579849267第2次執(zhí)行結(jié)束
輸出中可以看到任務(wù)被取消成功了。
Executors類Executors類,提供了一系列工廠方法用于創(chuàng)建線程池,返回的線程池都實(shí)現(xiàn)了ExecutorService接口。常用的方法有:
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
創(chuàng)建一個(gè)單線程的線程池。這個(gè)線程池只有一個(gè)線程在工作,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束,那么會(huì)有一個(gè)新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊(duì)列來緩存任務(wù),任務(wù)如果比較多,單線程如果處理不過來,會(huì)導(dǎo)致隊(duì)列堆滿,引發(fā)OOM。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
創(chuàng)建固定大小的線程池。每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,直到線程達(dá)到線程池的最大大小。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,在提交新任務(wù),任務(wù)將會(huì)進(jìn)入等待隊(duì)列中等待。如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊(duì)列來緩存任務(wù),任務(wù)如果比較多,如果處理不過來,會(huì)導(dǎo)致隊(duì)列堆滿,引發(fā)OOM。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
創(chuàng)建一個(gè)可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會(huì)回收部分空閑(60秒處于等待任務(wù)到來)的線程,當(dāng)任務(wù)數(shù)增加時(shí),此線程池又可以智能的添加新線程來處理任務(wù)。此線程池的最大值是Integer的最大值(2^31-1)。內(nèi)部使用了SynchronousQueue同步隊(duì)列來緩存任務(wù),此隊(duì)列的特性是放入任務(wù)時(shí)必須要有對應(yīng)的線程獲取任務(wù),任務(wù)才可以放入成功。如果處理的任務(wù)比較耗時(shí),任務(wù)來的速度也比較快,會(huì)創(chuàng)建太多的線程引發(fā)OOM。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
創(chuàng)建一個(gè)大小無限的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。
在《阿里巴巴java開發(fā)手冊》中指出了線程資源必須通過線程池提供,不允許在應(yīng)用中自行顯示的創(chuàng)建線程,這樣一方面是線程的創(chuàng)建更加規(guī)范,可以合理控制開辟線程的數(shù)量;另一方面線程的細(xì)節(jié)管理交給線程池處理,優(yōu)化了資源的開銷。而線程池不允許使用Executors去創(chuàng)建,而要通過ThreadPoolExecutor方式,這一方面是由于jdk中Executor框架雖然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等創(chuàng)建線程池的方法,但都有其局限性,不夠靈活;另外由于前面幾種方法內(nèi)部也是通過ThreadPoolExecutor方式實(shí)現(xiàn),使用ThreadPoolExecutor有助于大家明確線程池的運(yùn)行規(guī)則,創(chuàng)建符合自己的業(yè)務(wù)場景需要的線程池,避免資源耗盡的風(fēng)險(xiǎn)。
Future、Callable接口Future接口定義了操作異步異步任務(wù)執(zhí)行一些方法,如獲取異步任務(wù)的執(zhí)行結(jié)果、取消任務(wù)的執(zhí)行、判斷任務(wù)是否被取消、判斷任務(wù)執(zhí)行是否完畢等。
Callable接口中定義了需要有返回的任務(wù)需要實(shí)現(xiàn)的方法。
@FunctionalInterface public interface Callable{ V call() throws Exception; }
比如主線程讓一個(gè)子線程去執(zhí)行任務(wù),子線程可能比較耗時(shí),啟動(dòng)子線程開始執(zhí)行任務(wù)后,主線程就去做其他事情了,過了一會(huì)才去獲取子任務(wù)的執(zhí)行結(jié)果。
獲取異步任務(wù)執(zhí)行結(jié)果示例代碼:
package com.itsoku.chat18; import java.util.concurrent.*; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo6 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); Futureresult = executorService.submit(() -> { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()+",start!"); TimeUnit.SECONDS.sleep(5); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()+",end!"); return 10; }); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",結(jié)果:" + result.get()); } }
輸出:
1564581941442,main 1564581941442,pool-1-thread-1,start! 1564581946447,pool-1-thread-1,end! 1564581941442,main,結(jié)果:10
代碼中創(chuàng)建了一個(gè)線程池,調(diào)用線程池的submit方法執(zhí)行任務(wù),submit參數(shù)為Callable接口:表示需要執(zhí)行的任務(wù)有返回值,submit方法返回一個(gè)Future對象,F(xiàn)uture相當(dāng)于一個(gè)憑證,可以在任意時(shí)間拿著這個(gè)憑證去獲取對應(yīng)任務(wù)的執(zhí)行結(jié)果(調(diào)用其get方法),代碼中調(diào)用了result.get()方法之后,此方法會(huì)阻塞當(dāng)前線程直到任務(wù)執(zhí)行結(jié)束。
超時(shí)獲取異步任務(wù)執(zhí)行結(jié)果可能任務(wù)執(zhí)行比較耗時(shí),比如耗時(shí)1分鐘,我最多只能等待10秒,如果10秒還沒返回,我就去做其他事情了。
剛好get有個(gè)超時(shí)的方法,聲明如下:
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
示例代碼:
package com.itsoku.chat18; import java.util.concurrent.*; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo8 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); Futureresult = executorService.submit(() -> { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()+",start!"); TimeUnit.SECONDS.sleep(5); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()+",end!"); return 10; }); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()); try { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",結(jié)果:" + result.get(3,TimeUnit.SECONDS)); } catch (TimeoutException e) { e.printStackTrace(); } executorService.shutdown(); } }
輸出:
1564583177139,main 1564583177139,pool-1-thread-1,start! java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205) at com.itsoku.chat18.Demo8.main(Demo8.java:19) 1564583182142,pool-1-thread-1,end!
任務(wù)執(zhí)行中休眠了5秒,get方法獲取執(zhí)行結(jié)果,超時(shí)時(shí)間是3秒,3秒還未獲取到結(jié)果,get觸發(fā)了TimeoutException異常,當(dāng)前線程從阻塞狀態(tài)蘇醒了。
Future其他方法介紹一下
cancel:取消在執(zhí)行的任務(wù),參數(shù)表示是否對執(zhí)行的任務(wù)發(fā)送中斷信號(hào),方法聲明如下:
boolean cancel(boolean mayInterruptIfRunning);
isCancelled:用來判斷任務(wù)是否被取消
isDone:判斷任務(wù)是否執(zhí)行完畢。
cancel方法來個(gè)示例:
package com.itsoku.chat18; import java.util.concurrent.*; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo7 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); Futureresult = executorService.submit(() -> { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()+",start!"); TimeUnit.SECONDS.sleep(5); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()+",end!"); return 10; }); executorService.shutdown(); TimeUnit.SECONDS.sleep(1); result.cancel(false); System.out.println(result.isCancelled()); System.out.println(result.isDone()); TimeUnit.SECONDS.sleep(5); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",結(jié)果:" + result.get()); executorService.shutdown(); } }
輸出:
1564583031646,pool-1-thread-1,start! true true 1564583036649,pool-1-thread-1,end! 1564583037653,main Exception in thread "main" java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(FutureTask.java:121) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.itsoku.chat18.Demo7.main(Demo7.java:24)
輸出2個(gè)true,表示任務(wù)已被取消,已完成,最后調(diào)用get方法會(huì)觸發(fā)CancellationException異常。
總結(jié):從上面可以看出Future、Callable接口需要結(jié)合ExecutorService來使用,需要有線程池的支持。
FutureTask類FutureTask除了實(shí)現(xiàn)Future接口,還實(shí)現(xiàn)了Runnable接口,因此FutureTask可以交給Executor執(zhí)行,也可以交給線程執(zhí)行執(zhí)行(Thread有個(gè)Runnable的構(gòu)造方法),FutureTask表示帶返回值結(jié)果的任務(wù)。
上面我們演示的是通過線程池執(zhí)行任務(wù)然后獲取執(zhí)行結(jié)果。
這次我們通過FutureTask類,自己啟動(dòng)一個(gè)線程來獲取執(zhí)行結(jié)果,示例如下:
package com.itsoku.chat18; import java.util.concurrent.*; /** * 跟著阿里p7學(xué)并發(fā),微信公眾號(hào):javacode2018 */ public class Demo9 { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTaskfutureTask = new FutureTask (()->{ System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()+",start!"); TimeUnit.SECONDS.sleep(5); System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName()+",end!"); return 10; }); System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()); new Thread(futureTask).start(); System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()); System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",結(jié)果:"+futureTask.get()); } }
輸出:
1564585122547,main 1564585122547,main 1564585122547,Thread-0,start! 1564585127549,Thread-0,end! 1564585122547,main,結(jié)果:10
大家可以回過頭去看一下上面用線程池的submit方法返回的Future實(shí)際類型正是FutureTask對象,有興趣的可以設(shè)置個(gè)斷點(diǎn)去看看。
FutureTask類還是相當(dāng)重要的,標(biāo)記一下。
下面3個(gè)類,下一篇文章進(jìn)行詳解介紹CompletableFuture
介紹CompletionService
介紹ExecutorCompletionService
java高并發(fā)系列java高并發(fā)系列 - 第1天:必須知道的幾個(gè)概念
java高并發(fā)系列 - 第2天:并發(fā)級(jí)別
java高并發(fā)系列 - 第3天:有關(guān)并行的兩個(gè)重要定律
java高并發(fā)系列 - 第4天:JMM相關(guān)的一些概念
java高并發(fā)系列 - 第5天:深入理解進(jìn)程和線程
java高并發(fā)系列 - 第6天:線程的基本操作
java高并發(fā)系列 - 第7天:volatile與Java內(nèi)存模型
java高并發(fā)系列 - 第8天:線程組
java高并發(fā)系列 - 第9天:用戶線程和守護(hù)線程
java高并發(fā)系列 - 第10天:線程安全和synchronized關(guān)鍵字
java高并發(fā)系列 - 第11天:線程中斷的幾種方式
java高并發(fā)系列 - 第12天JUC:ReentrantLock重入鎖
java高并發(fā)系列 - 第13天:JUC中的Condition對象
java高并發(fā)系列 - 第14天:JUC中的LockSupport工具類,必備技能
java高并發(fā)系列 - 第15天:JUC中的Semaphore(信號(hào)量)
java高并發(fā)系列 - 第16天:JUC中等待多線程完成的工具類CountDownLatch,必備技能
java高并發(fā)系列 - 第17天:JUC中的循環(huán)柵欄CyclicBarrier的6種使用場景
java高并發(fā)系列 - 第18天:JAVA線程池,這一篇就夠了
java高并發(fā)系列 - 第19天:JUC中的Executor框架詳解1
高并發(fā)系列連載中,感興趣的加我微信itsoku,一起交流,關(guān)注公眾號(hào):路人甲Java,每天獲取最新連載文章!
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/75737.html
摘要:方法由兩個(gè)參數(shù),表示期望的值,表示要給設(shè)置的新值。操作包含三個(gè)操作數(shù)內(nèi)存位置預(yù)期原值和新值。如果處的值尚未同時(shí)更改,則操作成功。中就使用了這樣的操作。上面操作還有一點(diǎn)是將事務(wù)范圍縮小了,也提升了系統(tǒng)并發(fā)處理的性能。 這是java高并發(fā)系列第21篇文章。 本文主要內(nèi)容 從網(wǎng)站計(jì)數(shù)器實(shí)現(xiàn)中一步步引出CAS操作 介紹java中的CAS及CAS可能存在的問題 悲觀鎖和樂觀鎖的一些介紹及數(shù)據(jù)庫...
摘要:示例執(zhí)行一批任務(wù),然后消費(fèi)執(zhí)行結(jié)果代碼如下跟著阿里學(xué)并發(fā),微信公眾號(hào)輸出代碼中傳入了一批任務(wù)進(jìn)行處理,最終將所有處理完成的按任務(wù)完成的先后順序傳遞給進(jìn)行消費(fèi)了。 這是java高并發(fā)系列第20篇文章。 本文內(nèi)容 ExecutorCompletionService出現(xiàn)的背景 介紹CompletionService接口及常用的方法 介紹ExecutorCompletionService類及...
摘要:如問到是否使用某框架,實(shí)際是是問該框架的使用場景,有什么特點(diǎn),和同類可框架對比一系列的問題。這兩個(gè)方向的區(qū)分點(diǎn)在于工作方向的側(cè)重點(diǎn)不同。 [TOC] 這是一份來自嗶哩嗶哩的Java面試Java面試 32個(gè)核心必考點(diǎn)完全解析(完) 課程預(yù)習(xí) 1.1 課程內(nèi)容分為三個(gè)模塊 基礎(chǔ)模塊: 技術(shù)崗位與面試 計(jì)算機(jī)基礎(chǔ) JVM原理 多線程 設(shè)計(jì)模式 數(shù)據(jù)結(jié)構(gòu)與算法 應(yīng)用模塊: 常用工具集 ...
摘要:我的是忙碌的一年,從年初備戰(zhàn)實(shí)習(xí)春招,年三十都在死磕源碼,三月份經(jīng)歷了阿里五次面試,四月順利收到實(shí)習(xí)。因?yàn)槲倚睦砗芮宄业哪繕?biāo)是阿里。所以在收到阿里之后的那晚,我重新規(guī)劃了接下來的學(xué)習(xí)計(jì)劃,將我的短期目標(biāo)更新成拿下阿里轉(zhuǎn)正。 我的2017是忙碌的一年,從年初備戰(zhàn)實(shí)習(xí)春招,年三十都在死磕JDK源碼,三月份經(jīng)歷了阿里五次面試,四月順利收到實(shí)習(xí)offer。然后五月懷著忐忑的心情開始了螞蟻金...
摘要:從使用到原理學(xué)習(xí)線程池關(guān)于線程池的使用,及原理分析分析角度新穎面向切面編程的基本用法基于注解的實(shí)現(xiàn)在軟件開發(fā)中,分散于應(yīng)用中多出的功能被稱為橫切關(guān)注點(diǎn)如事務(wù)安全緩存等。 Java 程序媛手把手教你設(shè)計(jì)模式中的撩妹神技 -- 上篇 遇一人白首,擇一城終老,是多么美好的人生境界,她和他歷經(jīng)風(fēng)雨慢慢變老,回首走過的點(diǎn)點(diǎn)滴滴,依然清楚的記得當(dāng)初愛情萌芽的模樣…… Java 進(jìn)階面試問題列表 -...
閱讀 2967·2021-11-23 09:51
閱讀 1646·2021-11-15 11:36
閱讀 3109·2021-10-13 09:40
閱讀 2245·2021-09-28 09:35
閱讀 13323·2021-09-22 15:00
閱讀 1439·2019-08-29 13:56
閱讀 2989·2019-08-29 13:04
閱讀 2769·2019-08-28 18:06