摘要:分層支持分層一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當(dāng)前待構(gòu)造的對(duì)象的父結(jié)點(diǎn)。當(dāng)一個(gè)的參與者數(shù)量變成時(shí),如果有該有父結(jié)點(diǎn),就會(huì)將它從父結(jié)點(diǎn)中溢移除。當(dāng)首次將某個(gè)結(jié)點(diǎn)鏈接到樹中時(shí),會(huì)同時(shí)向該結(jié)點(diǎn)的父結(jié)點(diǎn)注冊(cè)一個(gè)參與者。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、Phaser簡(jiǎn)介
Phaser是JDK1.7開始引入的一個(gè)同步工具類,適用于一些需要分階段的任務(wù)的處理。它的功能與 CyclicBarrier和CountDownLatch有些類似,類似于一個(gè)多階段的柵欄,并且功能更強(qiáng)大,我們來比較下這三者的功能:
同步器 | 作用 |
---|---|
CountDownLatch | 倒數(shù)計(jì)數(shù)器,初始時(shí)設(shè)定計(jì)數(shù)器值,線程可以在計(jì)數(shù)器上等待,當(dāng)計(jì)數(shù)器值歸0后,所有等待的線程繼續(xù)執(zhí)行 |
CyclicBarrier | 循環(huán)柵欄,初始時(shí)設(shè)定參與線程數(shù),當(dāng)線程到達(dá)柵欄后,會(huì)等待其它線程的到達(dá),當(dāng)?shù)竭_(dá)柵欄的總數(shù)滿足指定數(shù)后,所有等待的線程繼續(xù)執(zhí)行 |
Phaser | 多階段柵欄,可以在初始時(shí)設(shè)定參與線程數(shù),也可以中途注冊(cè)/注銷參與者,當(dāng)?shù)竭_(dá)的參與者數(shù)量滿足柵欄設(shè)定的數(shù)量后,會(huì)進(jìn)行階段升級(jí)(advance) |
Phaser中有一些比較重要的概念,理解了這些概念才能理解Phaser的功能。
phase(階段)我們知道,在CyclicBarrier中,只有一個(gè)柵欄,線程在到達(dá)柵欄后會(huì)等待其它線程的到達(dá)。
Phaser也有柵欄,在Phaser中,柵欄的名稱叫做phase(階段),在任意時(shí)間點(diǎn),Phaser只處于某一個(gè)phase(階段),初始階段為0,最大達(dá)到Integerr.MAX_VALUE,然后再次歸零。當(dāng)所有parties參與者都到達(dá)后,phase值會(huì)遞增。
如果看過之前關(guān)于CyclicBarrier的文章,就會(huì)知道,Phaser中的phase(階段)這個(gè)概念其實(shí)和CyclicBarrier中的Generation很相似,只不過Generation沒有計(jì)數(shù)。
parties(參與者)parties(參與者)其實(shí)就是CyclicBarrier中的參與線程的概念。
CyclicBarrier中的參與者在初始構(gòu)造指定后就不能變更,而Phaser既可以在初始構(gòu)造時(shí)指定參與者的數(shù)量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊(cè)/注銷參與者。
arrive(到達(dá)) / advance(進(jìn)階)Phaser注冊(cè)完parties(參與者)之后,參與者的初始狀態(tài)是unarrived的,當(dāng)參與者到達(dá)(arrive)當(dāng)前階段(phase)后,狀態(tài)就會(huì)變成arrived。當(dāng)階段的到達(dá)參與者數(shù)滿足條件后(注冊(cè)的數(shù)量等于到達(dá)的數(shù)量),階段就會(huì)發(fā)生進(jìn)階(advance)——也就是phase值+1。
Termination(終止)代表當(dāng)前Phaser對(duì)象達(dá)到終止?fàn)顟B(tài),有點(diǎn)類似于CyclicBarrier中的柵欄被破壞的概念。
Tiering(分層)Phaser支持分層(Tiering) —— 一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當(dāng)前待構(gòu)造的Phaser對(duì)象的父結(jié)點(diǎn)。之所以引入Tiering,是因?yàn)楫?dāng)一個(gè)Phaser有大量參與者(parties)的時(shí)候,內(nèi)部的同步操作會(huì)使性能急劇下降,而分層可以降低競(jìng)爭(zhēng),從而減小因同步導(dǎo)致的額外開銷。
在一個(gè)分層Phasers的樹結(jié)構(gòu)中,注冊(cè)和撤銷子Phaser或父Phaser是自動(dòng)被管理的。當(dāng)一個(gè)Phaser的參與者(parties)數(shù)量變成0時(shí),如果有該P(yáng)haser有父結(jié)點(diǎn),就會(huì)將它從父結(jié)點(diǎn)中溢移除。
關(guān)于Phaser的分層,后續(xù)我們?cè)谥vPhaser原理時(shí)會(huì)進(jìn)一步討論。
二、Phaser示例為了更好的理解Phaser的功能,我們來看幾個(gè)示例:
示例一通過Phaser控制多個(gè)線程的執(zhí)行時(shí)機(jī):有時(shí)候我們希望所有線程到達(dá)指定點(diǎn)后再同時(shí)開始執(zhí)行,我們可以利用CyclicBarrier或CountDownLatch來實(shí)現(xiàn),這里給出使用Phaser的版本。
public class PhaserTest1 { public static void main(String[] args) { Phaser phaser = new Phaser(); for (int i = 0; i < 10; i++) { phaser.register(); // 注冊(cè)各個(gè)參與者線程 new Thread(new Task(phaser), "Thread-" + i).start(); } } } class Task implements Runnable { private final Phaser phaser; Task(Phaser phaser) { this.phaser = phaser; } @Override public void run() { int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達(dá) // do something System.out.println(Thread.currentThread().getName() + ": 執(zhí)行完任務(wù),當(dāng)前phase =" + i + ""); } }
輸出:
Thread-8: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-4: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-3: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-0: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-5: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-6: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-7: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-9: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-1: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-2: 執(zhí)行完任務(wù),當(dāng)前phase =1
以上示例中,創(chuàng)建了10個(gè)線程,并通過register方法注冊(cè)Phaser的參與者數(shù)量為10。當(dāng)某個(gè)線程調(diào)用arriveAndAwaitAdvance方法后,arrive數(shù)量會(huì)加1,如果數(shù)量沒有滿足總數(shù)(參與者數(shù)量10),當(dāng)前線程就是一直等待,當(dāng)最后一個(gè)線程到達(dá)后,所有線程都會(huì)繼續(xù)往下執(zhí)行。
注意:arriveAndAwaitAdvance方法是不響應(yīng)中斷的,也就是說即使當(dāng)前線程被中斷,arriveAndAwaitAdvance方法也不會(huì)返回或拋出異常,而是繼續(xù)等待。如果希望能夠響應(yīng)中斷,可以參考awaitAdvanceInterruptibly方法。示例二
通過Phaser實(shí)現(xiàn)開關(guān)。在以前講CountDownLatch時(shí),我們給出過以CountDownLatch實(shí)現(xiàn)開關(guān)的示例,也就是說,我們希望一些外部條件得到滿足后,然后打開開關(guān),線程才能繼續(xù)執(zhí)行,我們看下如何用Phaser來實(shí)現(xiàn)此功能。
public class PhaserTest2 { public static void main(String[] args) throws IOException { Phaser phaser = new Phaser(1); // 注冊(cè)主線程,當(dāng)外部條件滿足時(shí),由主線程打開開關(guān) for (int i = 0; i < 10; i++) { phaser.register(); // 注冊(cè)各個(gè)參與者線程 new Thread(new Task2(phaser), "Thread-" + i).start(); } // 外部條件:等待用戶輸入命令 System.out.println("Press ENTER to continue"); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); reader.readLine(); // 打開開關(guān) phaser.arriveAndDeregister(); System.out.println("主線程打開了開關(guān)"); } } class Task2 implements Runnable { private final Phaser phaser; Task2(Phaser phaser) { this.phaser = phaser; } @Override public void run() { int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達(dá) // do something System.out.println(Thread.currentThread().getName() + ": 執(zhí)行完任務(wù),當(dāng)前phase =" + i + ""); } }
輸出:
主線程打開了開關(guān) Thread-7: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-4: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-3: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-1: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-0: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-9: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-8: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-2: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-5: 執(zhí)行完任務(wù),當(dāng)前phase =1 Thread-6: 執(zhí)行完任務(wù),當(dāng)前phase =1
以上示例中,只有當(dāng)用戶按下回車之后,任務(wù)才真正開始執(zhí)行。這里主線程Main相當(dāng)于一個(gè)協(xié)調(diào)者,用來控制開關(guān)打開的時(shí)機(jī),arriveAndDeregister方法不會(huì)阻塞,該方法會(huì)將到達(dá)數(shù)加1,同時(shí)減少一個(gè)參與者數(shù)量,最終返回線程到達(dá)時(shí)的phase值。
示例三通過Phaser控制任務(wù)的執(zhí)行輪數(shù)
public class PhaserTest3 { public static void main(String[] args) throws IOException { int repeats = 3; // 指定任務(wù)最多執(zhí)行的次數(shù) Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------"); return phase + 1 >= repeats || registeredParties == 0; } }; for (int i = 0; i < 10; i++) { phaser.register(); // 注冊(cè)各個(gè)參與者線程 new Thread(new Task3(phaser), "Thread-" + i).start(); } } } class Task3 implements Runnable { private final Phaser phaser; Task3(Phaser phaser) { this.phaser = phaser; } @Override public void run() { while (!phaser.isTerminated()) { //只要Phaser沒有終止, 各個(gè)線程的任務(wù)就會(huì)一直執(zhí)行 int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達(dá) // do something System.out.println(Thread.currentThread().getName() + ": 執(zhí)行完任務(wù)"); } } }
輸出:
---------------PHASE[0],Parties[5] --------------- Thread-4: 執(zhí)行完任務(wù) Thread-1: 執(zhí)行完任務(wù) Thread-2: 執(zhí)行完任務(wù) Thread-3: 執(zhí)行完任務(wù) Thread-0: 執(zhí)行完任務(wù) ---------------PHASE[1],Parties[5] --------------- Thread-0: 執(zhí)行完任務(wù) Thread-3: 執(zhí)行完任務(wù) Thread-1: 執(zhí)行完任務(wù) Thread-4: 執(zhí)行完任務(wù) Thread-2: 執(zhí)行完任務(wù) ---------------PHASE[2],Parties[5] --------------- Thread-2: 執(zhí)行完任務(wù) Thread-4: 執(zhí)行完任務(wù) Thread-1: 執(zhí)行完任務(wù) Thread-0: 執(zhí)行完任務(wù) Thread-3: 執(zhí)行完任務(wù)
以上示例中,我們?cè)趧?chuàng)建Phaser對(duì)象時(shí),覆寫了onAdvance方法,這個(gè)方法類似于CyclicBarrier中的barrierAction任務(wù)。
也就是說,當(dāng)最后一個(gè)參與者到達(dá)時(shí),會(huì)觸發(fā)onAdvance方法,入?yún)?strong>phase表示到達(dá)時(shí)的phase值,registeredParties表示到達(dá)時(shí)的參與者數(shù)量,返回true表示需要終止Phaser。
我們通過phase + 1 >= repeats ,來控制階段(phase)數(shù)的上限為2(從0開始計(jì)),最終控制了每個(gè)線程的執(zhí)行任務(wù)次數(shù)為repeats次。
示例四Phaser支持分層功能,我們先來考慮下如何用利用Phaser的分層來實(shí)現(xiàn)高并發(fā)時(shí)的優(yōu)化,在示例三中,我們其實(shí)創(chuàng)建了10個(gè)任務(wù),然后10個(gè)線程共用一個(gè)Phaser對(duì)象,如下圖:
如果任務(wù)數(shù)繼續(xù)增大,那么同步產(chǎn)生的開銷會(huì)非常大,利用Phaser分層的功能,我們可以限定每個(gè)Phaser對(duì)象的最大使用線程(任務(wù)數(shù)),如下圖:
可以看到,上述Phasers其實(shí)構(gòu)成了一顆多叉樹,如果任務(wù)數(shù)繼續(xù)增多,還可以將Phaser的葉子結(jié)點(diǎn)繼續(xù)分裂,然后將分裂出的子結(jié)點(diǎn)供工作線程使用。
public class PhaserTest4 { private static final int TASKS_PER_PHASER = 4; // 每個(gè)Phaser對(duì)象對(duì)應(yīng)的工作線程(任務(wù))數(shù) public static void main(String[] args) throws IOException { int repeats = 3; // 指定任務(wù)最多執(zhí)行的次數(shù) Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------"); return phase + 1 >= repeats || registeredParties == 0; } }; Tasker[] taskers = new Tasker[10]; build(taskers, 0, taskers.length, phaser); // 根據(jù)任務(wù)數(shù),為每個(gè)任務(wù)分配Phaser對(duì)象 for (int i = 0; i < taskers.length; i++) { // 執(zhí)行任務(wù) Thread thread = new Thread(taskers[i]); thread.start(); } } private static void build(Tasker[] taskers, int lo, int hi, Phaser phaser) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(taskers, i, j, new Phaser(phaser)); } } else { for (int i = lo; i < hi; ++i) taskers[i] = new Tasker(i, phaser); } } } class Task4 implements Runnable { private final Phaser phaser; Task4(Phaser phaser) { this.phaser = phaser; this.phaser.register(); } @Override public void run() { while (!phaser.isTerminated()) { //只要Phaser沒有終止, 各個(gè)線程的任務(wù)就會(huì)一直執(zhí)行 int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達(dá) // do something System.out.println(Thread.currentThread().getName() + ": 執(zhí)行完任務(wù)"); } } }
輸出: ---------------PHASE[0],Parties[3] --------------- Thread-9: 執(zhí)行完任務(wù) Thread-6: 執(zhí)行完任務(wù) Thread-5: 執(zhí)行完任務(wù) Thread-4: 執(zhí)行完任務(wù) Thread-1: 執(zhí)行完任務(wù) Thread-0: 執(zhí)行完任務(wù) Thread-7: 執(zhí)行完任務(wù) Thread-8: 執(zhí)行完任務(wù) Thread-2: 執(zhí)行完任務(wù) Thread-3: 執(zhí)行完任務(wù) ---------------PHASE[1],Parties[3] --------------- Thread-3: 執(zhí)行完任務(wù) Thread-7: 執(zhí)行完任務(wù) Thread-0: 執(zhí)行完任務(wù) Thread-1: 執(zhí)行完任務(wù) Thread-5: 執(zhí)行完任務(wù) Thread-8: 執(zhí)行完任務(wù) Thread-2: 執(zhí)行完任務(wù) Thread-9: 執(zhí)行完任務(wù) Thread-6: 執(zhí)行完任務(wù) Thread-4: 執(zhí)行完任務(wù) ---------------PHASE[2],Parties[3] --------------- Thread-4: 執(zhí)行完任務(wù) Thread-2: 執(zhí)行完任務(wù) Thread-8: 執(zhí)行完任務(wù) Thread-0: 執(zhí)行完任務(wù) Thread-3: 執(zhí)行完任務(wù) Thread-9: 執(zhí)行完任務(wù) Thread-6: 執(zhí)行完任務(wù) Thread-7: 執(zhí)行完任務(wù) Thread-1: 執(zhí)行完任務(wù) Thread-5: 執(zhí)行完任務(wù)三、Phaser原理
Phaser是本系列至今為止,內(nèi)部結(jié)構(gòu)最為復(fù)雜的同步器之一。在開始深入Phaser原理之前,我們有必要先來講講Phaser的內(nèi)部組織結(jié)構(gòu)和它的設(shè)計(jì)思想。
Phaser的內(nèi)部結(jié)構(gòu)之前我們說過,Phaser支持樹形結(jié)構(gòu),在示例四中,也給出了一個(gè)通過分層提高并發(fā)性和程序執(zhí)行效率的例子。一個(gè)復(fù)雜分層結(jié)構(gòu)的Phaser樹的內(nèi)部結(jié)構(gòu)如下圖所示:
上面圖中的幾點(diǎn)關(guān)鍵點(diǎn):
樹的根結(jié)點(diǎn)root鏈接著兩個(gè)“無鎖?!薄猅reiber Stack,用于保存等待線程(比如當(dāng)線程等待Phaser進(jìn)入下一階段時(shí),會(huì)根據(jù)當(dāng)前階段的奇偶性,把自己掛到某個(gè)棧中),所有Phaser對(duì)象都共享這兩個(gè)棧。
當(dāng)首次將某個(gè)Phaser結(jié)點(diǎn)鏈接到樹中時(shí),會(huì)同時(shí)向該結(jié)點(diǎn)的父結(jié)點(diǎn)注冊(cè)一個(gè)參與者。
為什么需要向父結(jié)點(diǎn)注冊(cè)參與者?
首先我們要明白對(duì)于Phaser來說,什么時(shí)候會(huì)發(fā)生躍遷(advance)進(jìn)入下一階段?
廢話,當(dāng)然是等它所有參與者都到達(dá)的時(shí)候。那么它所等待的參與者都包含那幾類呢?
①對(duì)于一個(gè)孤立的Phaser結(jié)點(diǎn)(也可以看成是只有一個(gè)根結(jié)點(diǎn)的樹)
其等待的參與者,就是顯式注冊(cè)的參與者,這也是最常見的情況。
比如下圖,如果有10個(gè)Task共用這個(gè)Phaser,那等待的參與者數(shù)就是10,當(dāng)10個(gè)線程都到達(dá)后,Phaser就會(huì)躍遷至下一階段。
②對(duì)于一個(gè)非孤立的Phaser葉子結(jié)點(diǎn),比如下圖中標(biāo)綠的葉子結(jié)點(diǎn)
這種情況和①一樣,子Phaser1和子Phaser2等待的參與者數(shù)是4,子Phaser3等待的參與者數(shù)是2。
③對(duì)于一個(gè)非孤立非葉子的Phaser結(jié)點(diǎn),比如上圖中標(biāo)藍(lán)色的結(jié)點(diǎn)
這是最特殊的一種情況,這也是Phaser同步器關(guān)于分層的主要設(shè)計(jì)思路。
這種情況,結(jié)點(diǎn)所等待的參與者數(shù)目包含兩部分:
直接顯式注冊(cè)的參與者(通過構(gòu)造器或register方法)?!扔?
子結(jié)點(diǎn)的數(shù)目?!扔?
也就是說在上圖中,當(dāng)左一的子Phaser1的4個(gè)參與者都到達(dá)后,它會(huì)通知父結(jié)點(diǎn)Phaser,自己的狀態(tài)已經(jīng)OK了,這時(shí)Phaser會(huì)認(rèn)為子Phaser1已經(jīng)準(zhǔn)備就緒,會(huì)將自己的到達(dá)者數(shù)量加1,同理,當(dāng)子Phaser2和子Phaser3的所有參與者分別到達(dá)后,它們也會(huì)依次通知Phaser,只有當(dāng)Phaser(根結(jié)點(diǎn))的到達(dá)者數(shù)量為3時(shí),才會(huì)釋放“無鎖?!敝械却木€程,并將階段數(shù)phase增加1。
這是一種層層遞歸的設(shè)計(jì),只要當(dāng)根結(jié)點(diǎn)的所有參與者都到達(dá)后(也就是到達(dá)參數(shù)者數(shù)等于其子結(jié)點(diǎn)數(shù)),所有等待線程才會(huì)放行,柵欄才會(huì)進(jìn)入下一階段。
了解了上面這些,我們?cè)賮砜碢haser的源碼。
同步狀態(tài)定義Phaser使用一個(gè)long類型來保存同步狀態(tài)值State,并按位劃分不同區(qū)域的含義,通過掩碼和位運(yùn)算進(jìn)行賦值和操作:
“無鎖?!薄猅reiber Stack,保存在Phaser樹的根結(jié)點(diǎn)中,其余所有Phaser子結(jié)點(diǎn)共享這兩個(gè)棧:
結(jié)點(diǎn)的定義非常簡(jiǎn)單,內(nèi)部保存了線程信息和Phsaer對(duì)象信息:
注意:ForkJoinPool.ManagedBlocker是當(dāng)棧包含F(xiàn)orkJoinWorkerThread類型的QNode阻塞的時(shí)候,F(xiàn)orkJoinPool內(nèi)部會(huì)增加一個(gè)工作線程來保證并行度,后續(xù)講ForkJoin框架時(shí)我們會(huì)進(jìn)行分析。
Phaser的構(gòu)造器Phaser一共有4個(gè)構(gòu)造器,可以看到,最終其實(shí)都是調(diào)用了Phaser(Phaser parent, int parties)這個(gè)構(gòu)造器。
Phaser(Phaser parent, int parties)的內(nèi)部實(shí)現(xiàn)如下,關(guān)鍵就是給當(dāng)前的Phaser對(duì)象指定父結(jié)點(diǎn)時(shí),如果當(dāng)前Phaser的參與者不為0,需要向父Phaser注冊(cè)一個(gè)參與者(代表當(dāng)前結(jié)點(diǎn)本身):
Phaser提供了兩個(gè)注冊(cè)參與者的方法:
register:注冊(cè)單個(gè)參與者
bulkRegister:批量注冊(cè)參與者
這兩個(gè)方法都很簡(jiǎn)單,內(nèi)部調(diào)用了doRegister方法:
/** * 注冊(cè)指定數(shù)目{#registrations}的參與者 */ private int doRegister(int registrations) { // 首先計(jì)算注冊(cè)后當(dāng)前State要調(diào)整的值adjust long adjust = ((long) registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (; ; ) { long s = (parent == null) ? state : reconcileState(); // reconcileState()調(diào)整當(dāng)前Phaser的State與root一致 int counts = (int) s; int parties = counts >>> PARTIES_SHIFT; // 參與者數(shù)目 int unarrived = counts & UNARRIVED_MASK; // 未到達(dá)的數(shù)目 if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int) (s >>> PHASE_SHIFT); // 當(dāng)前Phaser所處的階段phase if (phase < 0) break; if (counts != EMPTY) { // CASE1: 當(dāng)前Phaser已經(jīng)注冊(cè)過參與者 if (parent == null || reconcileState() == s) { if (unarrived == 0) // 參與者已全部到達(dá)柵欄, 當(dāng)前Phaser正在Advance, 需要阻塞等待這一過程完成 root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) // 否則,直接更新State break; } } else if (parent == null) { // CASE2: 當(dāng)前Phaser未注冊(cè)過參與者(第一次注冊(cè)),且沒有父結(jié)點(diǎn) long next = ((long) phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) // CAS更新當(dāng)前Phaser的State值 break; } else { // CASE3: 當(dāng)前Phaser未注冊(cè)過參與者(第一次注冊(cè)),且有父結(jié)點(diǎn) synchronized (this) { if (state == s) { phase = parent.doRegister(1); // 向父結(jié)點(diǎn)注冊(cè)一個(gè)參與者 if (phase < 0) break; while (!UNSAFE.compareAndSwapLong(this, stateOffset, s, ((long) phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int) (root.state >>> PHASE_SHIFT); } break; } } } } return phase; }
doRegister方法用來給當(dāng)前Phaser對(duì)象注冊(cè)參與者,主要有三個(gè)分支:
①當(dāng)前Phaser已經(jīng)注冊(cè)過參與者
如果參與者已經(jīng)全部到達(dá)柵欄,則當(dāng)前線程需要阻塞等待(因?yàn)榇藭r(shí)phase正在變化,增加1到下一個(gè)phase),否則直接更新State。
②當(dāng)前Phaser未注冊(cè)過參與者(第一次注冊(cè)),且沒有父結(jié)點(diǎn)
這種情況最簡(jiǎn)單,直接更新當(dāng)前Phaser的State值。
③當(dāng)前Phaser未注冊(cè)過參與者(第一次注冊(cè)),且有父結(jié)點(diǎn)
說明當(dāng)前Phaser是新加入的葉子結(jié)點(diǎn),需要向父結(jié)點(diǎn)注冊(cè)自身,同時(shí)更新自身的State值。
注意: reconcileState方法比較特殊,因?yàn)楫?dāng)出現(xiàn)樹形結(jié)構(gòu)時(shí),根結(jié)點(diǎn)首先進(jìn)行phase的更新,所以需要顯式同步,使當(dāng)前結(jié)點(diǎn)和根結(jié)點(diǎn)保持一致。
另外,阻塞等待調(diào)用的是internalAwaitAdvance方法,其實(shí)就是根據(jù)當(dāng)前階段phase,將線程包裝成結(jié)點(diǎn)加入到root結(jié)點(diǎn)所指向的某個(gè)“無鎖?!敝校?/p>
/** * internalAwaitAdvance的主要邏輯就是:當(dāng)前參與者線程等待Phaser進(jìn)入下一個(gè)階段(就是phase值變化). * @return 返回新的階段 */ private int internalAwaitAdvance(int phase, QNode node) { // assert root == this; releaseWaiters(phase - 1); // 清空不用的Treiber Stack(奇偶Stack交替使用) boolean queued = false; // 入隊(duì)標(biāo)識(shí) int lastUnarrived = 0; int spins = SPINS_PER_ARRIVAL; long s; int p; while ((p = (int) ((s = state) >>> PHASE_SHIFT)) == phase) { if (node == null) { // spinning in noninterruptible mode int unarrived = (int) s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; boolean interrupted = Thread.interrupted(); if (interrupted || --spins < 0) { // need node to record intr node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } else if (node.isReleasable()) // done or aborted break; else if (!queued) { // 將結(jié)點(diǎn)壓入棧頂 AtomicReference參與者到達(dá)并等待head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); if ((q == null || q.phase == phase) && (int) (state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head.compareAndSet(q, node); } else { try { // 阻塞等待 ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { node.wasInterrupted = true; } } } ? if (node != null) { if (node.thread != null) node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); if (p == phase && (p = (int) (state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // possibly clean up on abort } releaseWaiters(phase); return p; }
arriveAndAwaitAdvance的主要邏輯如下:
首先將同步狀態(tài)值State中的未到達(dá)參與者數(shù)量減1,然后判斷未到達(dá)參與者數(shù)量是否為0?
如果不為0,則阻塞當(dāng)前線程,以等待其他參與者到來;
如果為0,說明當(dāng)前線程是最后一個(gè)參與者,如果有父結(jié)點(diǎn)則對(duì)父結(jié)點(diǎn)遞歸調(diào)用該方法。(因?yàn)橹挥懈Y(jié)點(diǎn)的未到達(dá)參與者數(shù)目為0時(shí)),才會(huì)進(jìn)階phase。
四、Phaser類/接口聲明 類聲明 構(gòu)造器聲明 接口聲明文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/76698.html
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:線程可以調(diào)用的方法進(jìn)入阻塞,當(dāng)計(jì)數(shù)值降到時(shí),所有之前調(diào)用阻塞的線程都會(huì)釋放。注意的初始計(jì)數(shù)值一旦降到,無法重置。 showImg(https://segmentfault.com/img/remote/1460000016012041); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、CountDownLatch簡(jiǎn)介 CountDow...
摘要:二接口簡(jiǎn)介可以看做是類的方法的替代品,與配合使用。當(dāng)線程執(zhí)行對(duì)象的方法時(shí),當(dāng)前線程會(huì)立即釋放鎖,并進(jìn)入對(duì)象的等待區(qū),等待其它線程喚醒或中斷。 showImg(https://segmentfault.com/img/remote/1460000016012601); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 本系列文章中所說的juc-...
摘要:注意線程與本地操作系統(tǒng)的線程是一一映射的。固定線程數(shù)的線程池提供了兩種創(chuàng)建具有固定線程數(shù)的的方法,固定線程池在初始化時(shí)確定其中的線程總數(shù),運(yùn)行過程中會(huì)始終維持線程數(shù)量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... ...
摘要:公平策略在多個(gè)線程爭(zhēng)用鎖的情況下,公平策略傾向于將訪問權(quán)授予等待時(shí)間最長(zhǎng)的線程。使用方式的典型調(diào)用方式如下二類原理的源碼非常簡(jiǎn)單,它通過內(nèi)部類實(shí)現(xiàn)了框架,接口的實(shí)現(xiàn)僅僅是對(duì)的的簡(jiǎn)單封裝,參見原理多線程進(jìn)階七鎖框架獨(dú)占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首發(fā)于一世流云的專欄:https...
閱讀 809·2021-08-17 10:11
閱讀 1680·2019-08-30 11:15
閱讀 1115·2019-08-26 13:54
閱讀 3581·2019-08-26 11:47
閱讀 1322·2019-08-26 10:20
閱讀 2915·2019-08-23 18:35
閱讀 1282·2019-08-23 17:52
閱讀 1369·2019-08-23 16:19