摘要:更新成功返回,否則返回這個(gè)操作是原子的,不會(huì)出現(xiàn)線程安全問題,這里面涉及到這個(gè)類的操作,一級(jí)涉及到這個(gè)屬性的意義。
簡(jiǎn)單解釋一下J.U.C,是JDK中提供的并發(fā)工具包,java.util.concurrent。里面提供了很多并發(fā)編程中很常用的實(shí)用工具類,比如atomic原子操作、比如lock同步鎖、fork/join等。從Lock作為切入點(diǎn)
我想以lock作為切入點(diǎn)來(lái)講解AQS,畢竟同步鎖是解決線程安全問題的通用手段,也是我們工作中用得比較多的方式。
Lock APILock是一個(gè)接口,方法定義如下
void lock() // 如果鎖可用就獲得鎖,如果鎖不可用就阻塞直到鎖釋放 void lockInterruptibly() // 和 lock()方法相似, 但阻塞的線程可中斷,拋出 java.lang.InterruptedException異常 boolean tryLock() // 非阻塞獲取鎖;嘗試獲取鎖,如果成功返回true boolean tryLock(long timeout, TimeUnit timeUnit) //帶有超時(shí)時(shí)間的獲取鎖方法 void unlock() // 釋放鎖Lock的實(shí)現(xiàn)
實(shí)現(xiàn)Lock接口的類有很多,以下為幾個(gè)常見的鎖實(shí)現(xiàn)
ReentrantLock:表示重入鎖,它是唯一一個(gè)實(shí)現(xiàn)了Lock接口的類。重入鎖指的是線程在獲得鎖之后,再次獲取該鎖不需要阻塞,而是直接關(guān)聯(lián)一次計(jì)數(shù)器增加重入次數(shù)
ReentrantReadWriteLock:重入讀寫鎖,它實(shí)現(xiàn)了ReadWriteLock接口,在這個(gè)類中維護(hù)了兩個(gè)鎖,一個(gè)是ReadLock,一個(gè)是WriteLock,他們都分別實(shí)現(xiàn)了Lock接口。讀寫鎖是一種適合讀多寫少的場(chǎng)景下解決線程安全問題的工具,基本原則是:讀和讀不互斥、讀和寫互斥、寫和寫互斥。也就是說涉及到影響數(shù)據(jù)變化的操作都會(huì)存在互斥。
StampedLock: stampedLock是JDK8引入的新的鎖機(jī)制,可以簡(jiǎn)單認(rèn)為是讀寫鎖的一個(gè)改進(jìn)版本,讀寫鎖雖然通過分離讀和寫的功能使得讀和讀之間可以完全并發(fā),但是讀和寫是有沖突的,如果大量的讀線程存在,可能會(huì)引起寫線程的饑餓。stampedLock是一種樂觀的讀策略,使得樂觀鎖完全不會(huì)阻塞寫線程
ReentrantLock的簡(jiǎn)單實(shí)用如何在實(shí)際應(yīng)用中使用ReentrantLock呢?我們通過一個(gè)簡(jiǎn)單的demo來(lái)演示一下
public class Demo { private static int count=0; static Lock lock=new ReentrantLock(); public static void inc(){ lock.lock(); try { Thread.sleep(1); count++; } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } }
這段代碼主要做一件事,就是通過一個(gè)靜態(tài)的incr()方法對(duì)共享變量count做連續(xù)遞增,在沒有加同步鎖的情況下多線程訪問這個(gè)方法一定會(huì)存在線程安全問題。所以用到了ReentrantLock來(lái)實(shí)現(xiàn)同步鎖,并且在finally語(yǔ)句塊中釋放鎖。
那么我來(lái)引出一個(gè)問題,大家思考一下
多個(gè)線程通過lock競(jìng)爭(zhēng)鎖時(shí),當(dāng)競(jìng)爭(zhēng)失敗的鎖是如何實(shí)現(xiàn)等待以及被喚醒的呢?什么是AQS
aqs全稱為AbstractQueuedSynchronizer,它提供了一個(gè)FIFO隊(duì)列,可以看成是一個(gè)用來(lái)實(shí)現(xiàn)同步鎖以及其他涉及到同步功能的核心組件,常見的有:ReentrantLock、CountDownLatch等。
AQS是一個(gè)抽象類,主要是通過繼承的方式來(lái)使用,它本身沒有實(shí)現(xiàn)任何的同步接口,僅僅是定義了同步狀態(tài)的獲取以及釋放的方法來(lái)提供自定義的同步組件。
可以這么說,只要搞懂了AQS,那么J.U.C中絕大部分的api都能輕松掌握。
從使用層面來(lái)說,AQS的功能分為兩種:獨(dú)占和共享
獨(dú)占鎖,每次只能有一個(gè)線程持有鎖,比如前面給大家演示的ReentrantLock就是以獨(dú)占方式實(shí)現(xiàn)的互斥鎖
共享鎖,允許多個(gè)線程同時(shí)獲取鎖,并發(fā)訪問共享資源,比如ReentrantReadWriteLock
ReentrantLock的類圖仍然以ReentrantLock為例,來(lái)分析AQS在重入鎖中的使用。畢竟單純分析AQS沒有太多的含義。先理解這個(gè)類圖,可以方便我們理解AQS的原理
AQS的實(shí)現(xiàn)依賴內(nèi)部的同步隊(duì)列,也就是FIFO的雙向隊(duì)列,如果當(dāng)前線程競(jìng)爭(zhēng)鎖失敗,那么AQS會(huì)把當(dāng)前線程以及等待狀態(tài)信息構(gòu)造成一個(gè)Node加入到同步隊(duì)列中,同時(shí)再阻塞該線程。當(dāng)獲取鎖的線程釋放鎖以后,會(huì)從隊(duì)列中喚醒一個(gè)阻塞的節(jié)點(diǎn)(線程)。
AQS隊(duì)列內(nèi)部維護(hù)的是一個(gè)FIFO的雙向鏈表,這種結(jié)構(gòu)的特點(diǎn)是每個(gè)數(shù)據(jù)結(jié)構(gòu)都有兩個(gè)指針,分別指向直接的后繼節(jié)點(diǎn)和直接前驅(qū)節(jié)點(diǎn)。所以雙向鏈表可以從任意一個(gè)節(jié)點(diǎn)開始很方便的訪問前驅(qū)和后繼。每個(gè)Node其實(shí)是由線程封裝,當(dāng)線程爭(zhēng)搶鎖失敗后會(huì)封裝成Node加入到ASQ隊(duì)列中去
Node類的組成如下
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; //前驅(qū)節(jié)點(diǎn) volatile Node next; //后繼節(jié)點(diǎn) volatile Thread thread;//當(dāng)前線程 Node nextWaiter; //存儲(chǔ)在condition隊(duì)列中的后繼節(jié)點(diǎn) //是否為共享鎖 final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } //將線程構(gòu)造成一個(gè)Node,添加到等待隊(duì)列 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //這個(gè)方法會(huì)在Condition隊(duì)列使用,后續(xù)多帶帶寫一篇文章分析condition Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }釋放鎖以及添加線程對(duì)于隊(duì)列的變化 添加節(jié)點(diǎn)
當(dāng)出現(xiàn)鎖競(jìng)爭(zhēng)以及釋放鎖的時(shí)候,AQS同步隊(duì)列中的節(jié)點(diǎn)會(huì)發(fā)生變化,首先看一下添加節(jié)點(diǎn)的場(chǎng)景。
這里會(huì)涉及到兩個(gè)變化
新的線程封裝成Node節(jié)點(diǎn)追加到同步隊(duì)列中,設(shè)置prev節(jié)點(diǎn)以及修改當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)的next節(jié)點(diǎn)指向自己
通過CAS講tail重新指向新的尾部節(jié)點(diǎn)
釋放鎖移除節(jié)點(diǎn)head節(jié)點(diǎn)表示獲取鎖成功的節(jié)點(diǎn),當(dāng)頭結(jié)點(diǎn)在釋放同步狀態(tài)時(shí),會(huì)喚醒后繼節(jié)點(diǎn),如果后繼節(jié)點(diǎn)獲得鎖成功,會(huì)把自己設(shè)置為頭結(jié)點(diǎn),節(jié)點(diǎn)的變化過程如下
這個(gè)過程也是涉及到兩個(gè)變化
修改head節(jié)點(diǎn)指向下一個(gè)獲得鎖的節(jié)點(diǎn)
新的獲得鎖的節(jié)點(diǎn),將prev的指針指向null
這里有一個(gè)小的變化,就是設(shè)置head節(jié)點(diǎn)不需要用CAS,原因是設(shè)置head節(jié)點(diǎn)是由獲得鎖的線程來(lái)完成的,而同步鎖只能由一個(gè)線程獲得,所以不需要CAS保證,只需要把head節(jié)點(diǎn)設(shè)置為原首節(jié)點(diǎn)的后繼節(jié)點(diǎn),并且斷開原h(huán)ead節(jié)點(diǎn)的next引用即可
AQS的源碼分析清楚了AQS的基本架構(gòu)以后,我們來(lái)分析一下AQS的源碼,仍然以ReentrantLock為模型。
ReentrantLock的時(shí)序圖調(diào)用ReentrantLock中的lock()方法,源碼的調(diào)用過程我使用了時(shí)序圖來(lái)展現(xiàn)
從圖上可以看出來(lái),當(dāng)鎖獲取失敗時(shí),會(huì)調(diào)用addWaiter()方法將當(dāng)前線程封裝成Node節(jié)點(diǎn)加入到AQS隊(duì)列,基于這個(gè)思路,我們來(lái)分析AQS的源碼實(shí)現(xiàn)
public void lock() { sync.lock(); }
這個(gè)是獲取鎖的入口,調(diào)用sync這個(gè)類里面的方法,sync是什么呢?
abstract static class Sync extends AbstractQueuedSynchronizer
sync是一個(gè)靜態(tài)內(nèi)部類,它繼承了AQS這個(gè)抽象類,前面說過AQS是一個(gè)同步工具,主要用來(lái)實(shí)現(xiàn)同步控制。我們?cè)诶眠@個(gè)工具的時(shí)候,會(huì)繼承它來(lái)實(shí)現(xiàn)同步控制功能。
通過進(jìn)一步分析,發(fā)現(xiàn)Sync這個(gè)類有兩個(gè)具體的實(shí)現(xiàn),分別是NofairSync(非公平鎖),FailSync(公平鎖).
公平鎖 表示所有線程嚴(yán)格按照FIFO來(lái)獲取鎖
非公平鎖 表示可以存在搶占鎖的功能,也就是說不管當(dāng)前隊(duì)列上是否存在其他線程等待,新線程都有機(jī)會(huì)搶占鎖
公平鎖和非公平鎖的實(shí)現(xiàn)上的差異,我會(huì)在文章后面做一個(gè)解釋,接下來(lái)的分析仍然以非公平鎖作為主要分析邏輯。
NonfairSync.lockfinal void lock() { if (compareAndSetState(0, 1)) //通過cas操作來(lái)修改state狀態(tài),表示爭(zhēng)搶鎖的操作 setExclusiveOwnerThread(Thread.currentThread());//設(shè)置當(dāng)前獲得鎖狀態(tài)的線程 else acquire(1); //嘗試去獲取鎖 }
這段代碼簡(jiǎn)單解釋一下
由于這里是非公平鎖,所以調(diào)用lock方法時(shí),先去通過cas去搶占鎖
如果搶占鎖成功,保存獲得鎖成功的當(dāng)前線程
搶占鎖失敗,調(diào)用acquire來(lái)走鎖競(jìng)爭(zhēng)邏輯
compareAndSetState
compareAndSetState的代碼實(shí)現(xiàn)邏輯如下
// See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
這段代碼其實(shí)邏輯很簡(jiǎn)單,就是通過cas樂觀鎖的方式來(lái)做比較并替換。上面這段代碼的意思是,如果當(dāng)前內(nèi)存中的state的值和預(yù)期值expect相等,則替換為update。更新成功返回true,否則返回false. 這個(gè)操作是原子的,不會(huì)出現(xiàn)線程安全問題,這里面涉及到Unsafe這個(gè)類的操作,一級(jí)涉及到state這個(gè)屬性的意義。 **state**
當(dāng)state=0時(shí),表示無(wú)鎖狀態(tài)
當(dāng)state>0時(shí),表示已經(jīng)有線程獲得了鎖,也就是state=1,但是因?yàn)镽eentrantLock允許重入,所以同一個(gè)線程多次獲得同步鎖的時(shí)候,state會(huì)遞增,比如重入5次,那么state=5。 而在釋放鎖的時(shí)候,同樣需要釋放5次直到state=0其他線程才有資格獲得鎖
acquireprivate volatile int state;需要注意的是:不同的AQS實(shí)現(xiàn),state所表達(dá)的含義是不一樣的。
Unsafe
Unsafe類是在sun.misc包下,不屬于Java標(biāo)準(zhǔn)。但是很多Java的基礎(chǔ)類庫(kù),包括一些被廣泛使用的高性能開發(fā)庫(kù)都是基于Unsafe類開發(fā)的,比如Netty、Hadoop、Kafka等;Unsafe可認(rèn)為是Java中留下的后門,提供了一些低層次操作,如直接內(nèi)存訪問、線程調(diào)度等public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);這個(gè)是一個(gè)native方法, 第一個(gè)參數(shù)為需要改變的對(duì)象,第二個(gè)為偏移量(即之前求出來(lái)的headOffset的值),第三個(gè)參數(shù)為期待的值,第四個(gè)為更新后的值
整個(gè)方法的作用是如果當(dāng)前時(shí)刻的值等于預(yù)期值var4相等,則更新為新的期望值 var5,如果更新成功,則返回true,否則返回false;
acquire是AQS中的方法,如果CAS操作未能成功,說明state已經(jīng)不為0,此時(shí)繼續(xù)acquire(1)操作,這里大家思考一下,acquire方法中的1的參數(shù)是用來(lái)做什么呢?如果沒猜中,往前面回顧一下state這個(gè)概念
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這個(gè)方法的主要邏輯是
通過tryAcquire嘗試獲取獨(dú)占鎖,如果成功返回true,失敗返回false
如果tryAcquire失敗,則會(huì)通過addWaiter方法將當(dāng)前線程封裝成Node添加到AQS隊(duì)列尾部
acquireQueued,將Node作為參數(shù),通過自旋去嘗試獲取鎖。
如果大家看過我寫的Synchronized源碼分析的文章,就應(yīng)該能夠明白自旋存在的意義NonfairSync.tryAcquire
這個(gè)方法的作用是嘗試獲取鎖,如果成功返回true,不成功返回false
它是重寫AQS類中的tryAcquire方法,并且大家仔細(xì)看一下AQS中tryAcquire方法的定義,并沒有實(shí)現(xiàn),而是拋出異常。按照一般的思維模式,既然是一個(gè)不實(shí)現(xiàn)的模版方法,那應(yīng)該定義成abstract,讓子類來(lái)實(shí)現(xiàn)呀?大家想想為什么
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }nonfairTryAcquire
tryAcquire(1)在NonfairSync中的實(shí)現(xiàn)代碼如下
ffinal boolean nonfairTryAcquire(int acquires) { //獲得當(dāng)前執(zhí)行的線程 final Thread current = Thread.currentThread(); int c = getState(); //獲得state的值 if (c == 0) { //state=0說明當(dāng)前是無(wú)鎖狀態(tài) //通過cas操作來(lái)替換state的值改為1,大家想想為什么要用cas呢? //理由是,在多線程環(huán)境中,直接修改state=1會(huì)存在線程安全問題,你猜到了嗎? if (compareAndSetState(0, acquires)) { //保存當(dāng)前獲得鎖的線程 setExclusiveOwnerThread(current); return true; } } //這段邏輯就很簡(jiǎn)單了。如果是同一個(gè)線程來(lái)獲得鎖,則直接增加重入次數(shù) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //增加重入次數(shù) if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
獲取當(dāng)前線程,判斷當(dāng)前的鎖的狀態(tài)
如果state=0表示當(dāng)前是無(wú)鎖狀態(tài),通過cas更新state狀態(tài)的值
如果當(dāng)前線程是屬于重入,則增加重入次數(shù)
addWaiter當(dāng)tryAcquire方法獲取鎖失敗以后,則會(huì)先調(diào)用addWaiter將當(dāng)前線程封裝成Node,然后添加到AQS隊(duì)列
private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE //將當(dāng)前線程封裝成Node,并且mode為獨(dú)占鎖 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail是AQS的中表示同步隊(duì)列隊(duì)尾的屬性,剛開始為null,所以進(jìn)行enq(node)方法 Node pred = tail; if (pred != null) { //tail不為空的情況,說明隊(duì)列中存在節(jié)點(diǎn)數(shù)據(jù) node.prev = pred; //講當(dāng)前線程的Node的prev節(jié)點(diǎn)指向tail if (compareAndSetTail(pred, node)) {//通過cas講node添加到AQS隊(duì)列 pred.next = node;//cas成功,把舊的tail的next指針指向新的tail return node; } } enq(node); //tail=null,將node添加到同步隊(duì)列中 return node; }
將當(dāng)前線程封裝成Node
判斷當(dāng)前鏈表中的tail節(jié)點(diǎn)是否為空,如果不為空,則通過cas操作把當(dāng)前線程的node添加到AQS隊(duì)列
如果為空或者cas失敗,調(diào)用enq將節(jié)點(diǎn)添加到AQS隊(duì)列
enqenq就是通過自旋操作把當(dāng)前節(jié)點(diǎn)加入到隊(duì)列中
private Node enq(final Node node) { //自旋,不做過多解釋,不清楚的關(guān)注公眾號(hào)[架構(gòu)師修煉寶典] for (;;) { Node t = tail; //如果是第一次添加到隊(duì)列,那么tail=null if (t == null) { // Must initialize //CAS的方式創(chuàng)建一個(gè)空的Node作為頭結(jié)點(diǎn) if (compareAndSetHead(new Node())) //此時(shí)隊(duì)列中只一個(gè)頭結(jié)點(diǎn),所以tail也指向它 tail = head; } else { //進(jìn)行第二次循環(huán)時(shí),tail不為null,進(jìn)入else區(qū)域。將當(dāng)前線程的Node結(jié)點(diǎn)的prev指向tail,然后使用CAS將tail指向Node node.prev = t; if (compareAndSetTail(t, node)) { //t此時(shí)指向tail,所以可以CAS成功,將tail重新指向Node。此時(shí)t為更新前的tail的值,即指向空的頭結(jié)點(diǎn),t.next=node,就將頭結(jié)點(diǎn)的后續(xù)結(jié)點(diǎn)指向Node,返回頭結(jié)點(diǎn) t.next = node; return t; } } } }
假如有兩個(gè)線程t1,t2同時(shí)進(jìn)入enq方法,t==null表示隊(duì)列是首次使用,需要先初始化
另外一個(gè)線程cas失敗,則進(jìn)入下次循環(huán),通過cas操作將node添加到隊(duì)尾
到目前為止,通過addwaiter方法構(gòu)造了一個(gè)AQS隊(duì)列,并且將線程添加到了隊(duì)列的節(jié)點(diǎn)中acquireQueued
將添加到隊(duì)列中的Node作為參數(shù)傳入acquireQueued方法,這里面會(huì)做搶占鎖的操作
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();// 獲取prev節(jié)點(diǎn),若為null即刻拋出NullPointException if (p == head && tryAcquire(arg)) {// 如果前驅(qū)為head才有資格進(jìn)行鎖的搶奪 setHead(node); // 獲取鎖成功后就不需要再進(jìn)行同步操作了,獲取鎖成功的線程作為新的head節(jié)點(diǎn) //凡是head節(jié)點(diǎn),head.thread與head.prev永遠(yuǎn)為null, 但是head.next不為null p.next = null; // help GC failed = false; //獲取鎖成功 return interrupted; } //如果獲取鎖失敗,則根據(jù)節(jié)點(diǎn)的waitStatus決定是否需要掛起線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 若前面為true,則執(zhí)行掛起,待下次喚醒的時(shí)候檢測(cè)中斷的標(biāo)志 interrupted = true; } } finally { if (failed) // 如果拋出異常則取消鎖的獲取,進(jìn)行出隊(duì)(sync queue)操作 cancelAcquire(node); } }
獲取當(dāng)前節(jié)點(diǎn)的prev節(jié)點(diǎn)
如果prev節(jié)點(diǎn)為head節(jié)點(diǎn),那么它就有資格去爭(zhēng)搶鎖,調(diào)用tryAcquire搶占鎖
搶占鎖成功以后,把獲得鎖的節(jié)點(diǎn)設(shè)置為head,并且移除原來(lái)的初始化head節(jié)點(diǎn)
如果獲得鎖失敗,則根據(jù)waitStatus決定是否需要掛起線程
最后,通過cancelAcquire取消獲得鎖的操作
前面的邏輯都很好理解,主要看一下shouldParkAfterFailedAcquire這個(gè)方法和parkAndCheckInterrupt的作用
shouldParkAfterFailedAcquire從上面的分析可以看出,只有隊(duì)列的第二個(gè)節(jié)點(diǎn)可以有機(jī)會(huì)爭(zhēng)用鎖,如果成功獲取鎖,則此節(jié)點(diǎn)晉升為頭節(jié)點(diǎn)。對(duì)于第三個(gè)及以后的節(jié)點(diǎn),if (p == head)條件不成立,首先進(jìn)行shouldParkAfterFailedAcquire(p, node)操作
shouldParkAfterFailedAcquire方法是判斷一個(gè)爭(zhēng)用鎖的線程是否應(yīng)該被阻塞。它首先判斷一個(gè)節(jié)點(diǎn)的前置節(jié)點(diǎn)的狀態(tài)是否為Node.SIGNAL,如果是,是說明此節(jié)點(diǎn)已經(jīng)將狀態(tài)設(shè)置-如果鎖釋放,則應(yīng)當(dāng)通知它,所以它可以安全的阻塞了,返回true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前繼節(jié)點(diǎn)的狀態(tài) if (ws == Node.SIGNAL)//如果是SIGNAL狀態(tài),意味著當(dāng)前線程需要被unpark喚醒 return true; 如果前節(jié)點(diǎn)的狀態(tài)大于0,即為CANCELLED狀態(tài)時(shí),則會(huì)從前節(jié)點(diǎn)開始逐步循環(huán)找到一個(gè)沒有被“CANCELLED”節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)的前節(jié)點(diǎn),返回false。在下次循環(huán)執(zhí)行shouldParkAfterFailedAcquire時(shí),返回true。這個(gè)操作實(shí)際是把隊(duì)列中CANCELLED的節(jié)點(diǎn)剔除掉。 if (ws > 0) {// 如果前繼節(jié)點(diǎn)是“取消”狀態(tài),則設(shè)置 “當(dāng)前節(jié)點(diǎn)”的 “當(dāng)前前繼節(jié)點(diǎn)” 為 “‘原前繼節(jié)點(diǎn)"的前繼節(jié)點(diǎn)”。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前繼節(jié)點(diǎn)為“0”或者“共享鎖”狀態(tài),則設(shè)置前繼節(jié)點(diǎn)為SIGNAL狀態(tài)。 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }parkAndCheckInterrupt
如果shouldParkAfterFailedAcquire返回了true,則會(huì)執(zhí)行:parkAndCheckInterrupt()方法,它是通過LockSupport.park(this)將當(dāng)前線程掛起到WATING狀態(tài),它需要等待一個(gè)中斷、unpark方法來(lái)喚醒它,通過這樣一種FIFO的機(jī)制的等待,來(lái)實(shí)現(xiàn)了Lock的操作。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
鎖的釋放 ReentrantLock.unlockLockSupport
LockSupport類是Java6引入的一個(gè)類,提供了基本的線程同步原語(yǔ)。LockSupport實(shí)際上是調(diào)用了Unsafe類里的函數(shù),歸結(jié)到Unsafe里,只有兩個(gè)函數(shù):public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time);unpark函數(shù)為線程提供“許可(permit)”,線程調(diào)用park函數(shù)則等待“許可”。這個(gè)有點(diǎn)像信號(hào)量,但是這個(gè)“許可”是不能疊加的,“許可”是一次性的。
permit相當(dāng)于0/1的開關(guān),默認(rèn)是0,調(diào)用一次unpark就加1變成了1.調(diào)用一次park會(huì)消費(fèi)permit,又會(huì)變成0。 如果再調(diào)用一次park會(huì)阻塞,因?yàn)閜ermit已經(jīng)是0了。直到permit變成1.這時(shí)調(diào)用unpark會(huì)把permit設(shè)置為1.每個(gè)線程都有一個(gè)相關(guān)的permit,permit最多只有一個(gè),重復(fù)調(diào)用unpark不會(huì)累積
加鎖的過程分析完以后,再來(lái)分析一下釋放鎖的過程,調(diào)用release方法,這個(gè)方法里面做兩件事,1,釋放鎖 ;2,喚醒park的線程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }tryRelease
這個(gè)動(dòng)作可以認(rèn)為就是一個(gè)設(shè)置鎖狀態(tài)的操作,而且是將狀態(tài)減掉傳入的參數(shù)值(參數(shù)是1),如果結(jié)果狀態(tài)為0,就將排它鎖的Owner設(shè)置為null,以使得其它的線程有機(jī)會(huì)進(jìn)行執(zhí)行。
在排它鎖中,加鎖的時(shí)候狀態(tài)會(huì)增加1(當(dāng)然可以自己修改這個(gè)值),在解鎖的時(shí)候減掉1,同一個(gè)鎖,在可以重入后,可能會(huì)被疊加為2、3、4這些值,只有unlock()的次數(shù)與lock()的次數(shù)對(duì)應(yīng)才會(huì)將Owner線程設(shè)置為空,而且也只有這種情況下才會(huì)返回true。
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 這里是將鎖的數(shù)量減1 if (Thread.currentThread() != getExclusiveOwnerThread())// 如果釋放的線程和獲取鎖的線程不是同一個(gè),拋出非法監(jiān)視器狀態(tài)異常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 由于重入的關(guān)系,不是每次釋放鎖c都等于0, // 直到最后一次釋放鎖時(shí),才會(huì)把當(dāng)前線程釋放 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }unparkSuccessor
在方法unparkSuccessor(Node)中,就意味著真正要釋放鎖了,它傳入的是head節(jié)點(diǎn)(head節(jié)點(diǎn)是占用鎖的節(jié)點(diǎn)),當(dāng)前線程被釋放之后,需要喚醒下一個(gè)節(jié)點(diǎn)的線程
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {//判斷后繼節(jié)點(diǎn)是否為空或者是否是取消狀態(tài), s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //然后從隊(duì)列尾部向前遍歷找到最前面的一個(gè)waitStatus小于0的節(jié)點(diǎn), 至于為什么從尾部開始向前遍歷,因?yàn)樵赿oAcquireInterruptibly.cancelAcquire方法的處理過程中只設(shè)置了next的變化,沒有設(shè)置prev的變化,在最后有這樣一行代碼:node.next = node,如果這時(shí)執(zhí)行了unparkSuccessor方法,并且向后遍歷的話,就成了死循環(huán)了,所以這時(shí)只有prev是穩(wěn)定的 s = t; } //內(nèi)部首先會(huì)發(fā)生的動(dòng)作是獲取head節(jié)點(diǎn)的next節(jié)點(diǎn),如果獲取到的節(jié)點(diǎn)不為空,則直接通過:“LockSupport.unpark()”方法來(lái)釋放對(duì)應(yīng)的被掛起的線程,這樣一來(lái)將會(huì)有一個(gè)節(jié)點(diǎn)喚醒后繼續(xù)進(jìn)入循環(huán)進(jìn)一步嘗試tryAcquire()方法來(lái)獲取鎖 if (s != null) LockSupport.unpark(s.thread); //釋放許可 }總結(jié)
通過這篇文章基本將AQS隊(duì)列的實(shí)現(xiàn)過程做了比較清晰的分析,主要是基于非公平鎖的獨(dú)占鎖實(shí)現(xiàn)。在獲得同步鎖時(shí),同步器維護(hù)一個(gè)同步隊(duì)列,獲取狀態(tài)失敗的線程都會(huì)被加入到隊(duì)列中并在隊(duì)列中進(jìn)行自旋;移出隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時(shí),同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/72692.html
摘要:在創(chuàng)建對(duì)象時(shí),需要轉(zhuǎn)入一個(gè)值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數(shù)。當(dāng)?shù)竭_(dá)屏障的線程數(shù)小于時(shí),這些線程都會(huì)被阻塞住。當(dāng)所有線程到達(dá)屏障后,將會(huì)被更新,表示進(jìn)入新一輪的運(yùn)行輪次中。 1.簡(jiǎn)介 在分析完AbstractQueuedSynchronizer(以下簡(jiǎn)稱 AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個(gè)...
摘要:其二如果返回值等于表示當(dāng)前線程獲取共享鎖成功,但它后續(xù)的線程是無(wú)法繼續(xù)獲取的,也就是不需要把它后面等待的節(jié)點(diǎn)喚醒。 在了解了AQS獨(dú)占鎖模式以后,接下來(lái)再來(lái)看看共享鎖的實(shí)現(xiàn)原理。 原文地址:http://www.jianshu.com/p/1161... 搞清楚AQS獨(dú)占鎖的實(shí)現(xiàn)原理之后,再看共享鎖的實(shí)現(xiàn)原理就會(huì)輕松很多。兩種鎖模式之間很多通用的地方本文只會(huì)簡(jiǎn)單說明一下,就不在贅述了,...
摘要:為了避免一篇文章的篇幅過長(zhǎng),于是一些比較大的主題就都分成幾篇來(lái)講了,這篇文章是筆者所有文章的目錄,將會(huì)持續(xù)更新,以給大家一個(gè)查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷懽鞯臅r(shí)候發(fā)現(xiàn),為了弄懂一個(gè)知識(shí),不得不先去了解另外一些知識(shí),這樣以來(lái),為了說明一個(gè)問題,就要把一系列知識(shí)都了解一遍,寫出來(lái)的文章就特別長(zhǎng)。 為了避免一篇...
摘要:為了避免一篇文章的篇幅過長(zhǎng),于是一些比較大的主題就都分成幾篇來(lái)講了,這篇文章是筆者所有文章的目錄,將會(huì)持續(xù)更新,以給大家一個(gè)查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷懽鞯臅r(shí)候發(fā)現(xiàn),為了弄懂一個(gè)知識(shí),不得不先去了解另外一些知識(shí),這樣以來(lái),為了說明一個(gè)問題,就要把一系列知識(shí)都了解一遍,寫出來(lái)的文章就特別長(zhǎng)。 為了避免一篇...
摘要:獲取鎖的過程當(dāng)線程調(diào)用申請(qǐng)獲取鎖資源,如果成功,則進(jìn)入臨界區(qū)。如果隊(duì)列中有其他等待鎖資源的線程需要喚醒,則喚醒隊(duì)列中的第一個(gè)等待節(jié)點(diǎn)先入先出。釋放鎖時(shí),如果隊(duì)列中有等待的線程就進(jìn)行喚醒。 每一個(gè)Java工程師應(yīng)該都或多或少了解過AQS,我自己也是前前后后,反反復(fù)復(fù)研究了很久,看了忘,忘了再看,每次都有不一樣的體會(huì)。這次趁著寫博客,打算重新拿出來(lái)系統(tǒng)的研究下它的源碼,總結(jié)成文章,便于以后...
閱讀 644·2021-09-03 00:22
閱讀 1520·2021-08-03 14:03
閱讀 2255·2021-07-25 21:37
閱讀 819·2019-08-30 13:18
閱讀 2020·2019-08-29 16:19
閱讀 2827·2019-08-29 13:22
閱讀 1440·2019-08-29 12:16
閱讀 2711·2019-08-26 12:16