摘要:主要的實(shí)現(xiàn)實(shí)際上運(yùn)行還是一個(gè),它對(duì)做了一個(gè)封裝,讓開(kāi)發(fā)人員可以從其中獲取返回值是有狀態(tài)的共種狀態(tài),四種狀態(tài)變換的可能和的區(qū)別通過(guò)方法調(diào)用有返回值可以拋異常結(jié)果的實(shí)現(xiàn)原理判斷狀態(tài)非狀態(tài)則直接進(jìn)入返回結(jié)果處于狀態(tài),則進(jìn)入等待流程獲
主要的實(shí)現(xiàn)FutureTask
# FutureTask實(shí)際上運(yùn)行還是一個(gè)runnable,它對(duì)callable做了一個(gè)封裝,讓開(kāi)發(fā)人員可以從其中獲取返回值; FutrueTask是有狀態(tài)的 共7種狀態(tài),四種狀態(tài)變換的可能 NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> COMPLETING -> NORMAL NEW -> INTERRUPTING -> INTERRUPTEDCallable和runnable的區(qū)別
0. 通過(guò)call方法調(diào)用; 1. 有返回值 2. 可以拋異常get結(jié)果的實(shí)現(xiàn)原理
1. 判斷狀態(tài); 2. 非NEW,COMPLETING狀態(tài)則直接 進(jìn)入report返回結(jié)果; 3. 處于NEW,COMPLETING狀態(tài),則進(jìn)入等待awaitDone();3.x awaitDone 流程
3.1. 獲取等待的超時(shí)時(shí)間deadline; 3.2. 進(jìn)入自旋 3.3. 判斷線程是否被中斷:如果被中斷則移出等待waiters隊(duì)列;并拋出異常; 3.4. 判斷FutrueTask狀態(tài):如果">COMPLETING",代表執(zhí)行完成,進(jìn)入report; 3.5. 判斷FutrueTask狀態(tài):如果"=COMPLETING",讓出CPU執(zhí)行Thread.yield(); 3.6. 為當(dāng)前線程創(chuàng)建一個(gè)node節(jié)點(diǎn); 3.7. 將當(dāng)前線程WaitNode加入等待隊(duì)列waiters中; 3.8. 判斷是否超時(shí); 3.9. 通過(guò)LockSupport.park掛起線程,等待運(yùn)行許可; 4. report返回執(zhí)行結(jié)果:如果一切正常就返回執(zhí)行結(jié)果,否則返回Exception;run具體執(zhí)行原理如下:
1. 判斷狀態(tài)是否正常,避免重復(fù)執(zhí)行; 2. 調(diào)用callable的call()方法; 3. 修改執(zhí)行狀態(tài);保存執(zhí)行結(jié)果;并通知正在等待get的線程; ## 3.x通知機(jī)制finishCompletion 3.1. 獲取所有waiters的集合; 3.2. 通過(guò)cas 拿到執(zhí)行權(quán); 3.3. 循環(huán)遍歷所有等待的線程,通過(guò)LockSupport.unpark 喚醒其執(zhí)行;Callable和Future的實(shí)現(xiàn)原理(JDK8源碼分析) 1. cancel 取消執(zhí)行
public boolean cancel(boolean mayInterruptIfRunning) { // 判斷狀態(tài):只有剛創(chuàng)建的情況下才能取消 // mayInterruptIfRunning:是否中斷當(dāng)前正在運(yùn)行這個(gè)FutureTask的線程; if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // 如果要中斷當(dāng)前線程,則對(duì)runner發(fā)布interrupt信號(hào); if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state // 修改狀態(tài)為:已經(jīng)通知線程進(jìn)行中斷 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 通知其他在等待結(jié)果的線程 finishCompletion(); } return true; }2. run
public void run() { // 判斷狀態(tài)及設(shè)置futuretask歸屬的線程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable3. getc = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 執(zhí)行Callable result = c.call(); // 標(biāo)記為執(zhí)行成功 ran = true; } catch (Throwable ex) { result = null; // 標(biāo)記為執(zhí)行不成功 ran = false; // 設(shè)置為異常狀態(tài),并通知其他在等待結(jié)果的線程 setException(ex); } // 如果執(zhí)行成功,修改狀態(tài)為正常,并通知其他在等待結(jié)果的線程 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 如果狀態(tài)為準(zhǔn)備發(fā)起中斷信號(hào)或者已經(jīng)發(fā)出中斷信號(hào),則讓出CPU(Thread.yield()) if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果還沒(méi)執(zhí)行完,則等待 if (s <= COMPLETING) s = awaitDone(false, 0L); // 通過(guò)report取結(jié)果 return report(s); }3.1 report 取執(zhí)行結(jié)果
private V report(int s) throws ExecutionException { Object x = outcome; // 如果一切正常,則返回x(x是callable執(zhí)行的結(jié)果outcome) if (s == NORMAL) return (V)x; // 如果被取消,則拋出已取消異常 if (s >= CANCELLED) throw new CancellationException(); // 否則拋出執(zhí)行異常 throw new ExecutionException((Throwable)x); }3.2 awaitDone 等待FutureTask執(zhí)行結(jié)束
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 記錄等待超時(shí)的時(shí)間 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 多個(gè)在等待結(jié)果的線程,通過(guò)一個(gè)鏈表進(jìn)行保存,waitNode就是每個(gè)線程在鏈表中的節(jié)點(diǎn); WaitNode q = null; boolean queued = false; // 死循環(huán)...也可以說(shuō)是自旋鎖同步 for (;;) { // 判斷當(dāng)前這個(gè)調(diào)用get的線程是否被中斷 if (Thread.interrupted()) { // 將當(dāng)前線程移出隊(duì)列 removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果狀態(tài)非初創(chuàng)或執(zhí)行完畢了,則跳出循環(huán),通過(guò)report()取執(zhí)行結(jié)果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果狀態(tài)等于已執(zhí)行,讓出CPU執(zhí)行,等待狀態(tài)變?yōu)檎=Y(jié)束 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 如果當(dāng)前線程還沒(méi)有創(chuàng)建對(duì)象的waitNode節(jié)點(diǎn),則創(chuàng)建一個(gè) else if (q == null) q = new WaitNode(); // 如果當(dāng)前線程對(duì)應(yīng)的waitNode還沒(méi)有加入到等待鏈表中,則加入進(jìn)去; else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有設(shè)置等待超時(shí)時(shí)間,則通過(guò)parkNanos掛起當(dāng)前線程,等待繼續(xù)執(zhí)行的信號(hào) else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 通過(guò)park掛起當(dāng)前線程,等待task執(zhí)行結(jié)束后給它發(fā)一個(gè)繼續(xù)執(zhí)行的信號(hào)(unpark) else LockSupport.park(this); } }4. finishCompletion 通知所有在等待結(jié)果的線程
private void finishCompletion() { // assert state > COMPLETING; // 遍歷所有正在等待執(zhí)行結(jié)果的線程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // unpark,發(fā)布一個(gè)讓它繼續(xù)執(zhí)行的“許可” LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/69216.html
摘要:表示一個(gè)異步任務(wù)的結(jié)果,就是向線程池提交一個(gè)任務(wù)后,它會(huì)返回對(duì)應(yīng)的對(duì)象。它們分別提供兩個(gè)重要的功能阻塞當(dāng)前線程等待一段時(shí)間直到完成或者異常終止取消任務(wù)。此時(shí),線程從中返回,然后檢查當(dāng)前的狀態(tài)已經(jīng)被改變,隨后退出循環(huán)。 0 引言 前段時(shí)間需要把一個(gè)C++的項(xiàng)目port到Java中,因此時(shí)隔三年后重新熟悉了下Java。由于需要一個(gè)通用的線程池,自然而然就想到了Executors。 用了...
摘要:零前期準(zhǔn)備文章異常啰嗦且繞彎。版本版本簡(jiǎn)介是中默認(rèn)的實(shí)現(xiàn)類(lèi),常與結(jié)合進(jìn)行多線程并發(fā)操作。所以方法的主體其實(shí)就是去喚醒被阻塞的線程。本文僅為個(gè)人的學(xué)習(xí)筆記,可能存在錯(cuò)誤或者表述不清的地方,有緣補(bǔ)充 零 前期準(zhǔn)備 0 FBI WARNING 文章異常啰嗦且繞彎。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 簡(jiǎn)介 ...
摘要:在之前,它是一個(gè)備受爭(zhēng)議的關(guān)鍵字,因?yàn)樵诔绦蛑惺褂盟占骼斫夂驮矸治龊?jiǎn)稱,是后提供的面向大內(nèi)存區(qū)數(shù)到數(shù)多核系統(tǒng)的收集器,能夠?qū)崿F(xiàn)軟停頓目標(biāo)收集并且具有高吞吐量具有更可預(yù)測(cè)的停頓時(shí)間。 35 個(gè) Java 代碼性能優(yōu)化總結(jié) 優(yōu)化代碼可以減小代碼的體積,提高代碼運(yùn)行的效率。 從 JVM 內(nèi)存模型談線程安全 小白哥帶你打通任督二脈 Java使用讀寫(xiě)鎖替代同步鎖 應(yīng)用情景 前一陣有個(gè)做...
摘要:本文的源碼基于。人如其名,包含了和兩部分。而將一個(gè)任務(wù)的狀態(tài)設(shè)置成終止態(tài)只有三種方法我們將在下文的源碼解析中分析這三個(gè)方法。將棧中所有掛起的線程都喚醒后,下面就是執(zhí)行方法這個(gè)方法是一個(gè)空方 前言 系列文章目錄 有了上一篇對(duì)預(yù)備知識(shí)的了解之后,分析源碼就容易多了,本篇我們就直接來(lái)看看FutureTask的源碼。 本文的源碼基于JDK1.8。 Future和Task 在深入分析源碼之前,我...
摘要:從而可以啟動(dòng)和取消異步計(jì)算任務(wù)查詢異步計(jì)算任務(wù)是否完成和獲取異步計(jì)算任務(wù)的返回結(jié)果。原理分析在分析中我們沒(méi)有看它的父類(lèi),其中有一個(gè)方法,返回一個(gè),說(shuō)明該方法可以獲取異步任務(wù)的返回結(jié)果。 FutureTask介紹 FutureTask是一種可取消的異步計(jì)算任務(wù)。它實(shí)現(xiàn)了Future接口,代表了異步任務(wù)的返回結(jié)果。從而FutureTask可以啟動(dòng)和取消異步計(jì)算任務(wù)、查詢異步計(jì)算任務(wù)是否完成...
閱讀 3026·2021-11-17 09:33
閱讀 1762·2021-10-12 10:13
閱讀 2614·2021-09-22 15:48
閱讀 2489·2019-08-29 17:19
閱讀 2646·2019-08-26 11:50
閱讀 1626·2019-08-26 10:37
閱讀 1790·2019-08-23 16:54
閱讀 2973·2019-08-23 14:14