亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

AbstractQueuedSynchronizer的介紹和原理分析

Yuanf / 1054人閱讀

摘要:同步器擁有三個(gè)成員變量隊(duì)列的頭結(jié)點(diǎn)隊(duì)列的尾節(jié)點(diǎn)和狀態(tài)。對(duì)于同步器維護(hù)的狀態(tài),多個(gè)線程對(duì)其的獲取將會(huì)產(chǎn)生一個(gè)鏈?zhǔn)降慕Y(jié)構(gòu)。使用將當(dāng)前線程,關(guān)于后續(xù)會(huì)詳細(xì)介紹。

簡介

提供了一個(gè)基于FIFO隊(duì)列,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架。該同步器(以下簡稱同步器)利用了一個(gè)int來表示狀態(tài),期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)。使用的方法是繼承,子類通過繼承同步器并需要實(shí)現(xiàn)它的方法來管理其狀態(tài),管理的方式就是通過類似acquire和release的方式來操縱狀態(tài)。然而多線程環(huán)境中對(duì)狀態(tài)的操縱必須確保原子性,因此子類對(duì)于狀態(tài)的把握,需要使用這個(gè)同步器提供的以下三個(gè)方法對(duì)狀態(tài)進(jìn)行操作:

java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()

java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)

java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子類推薦被定義為自定義同步裝置的內(nèi)部類,同步器自身沒有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干acquire之類的方法來供使用。該同步器即可以作為排他模式也可以作為共享模式,當(dāng)它被定義為一個(gè)排他模式時(shí),其他線程對(duì)其的獲取就被阻止,而共享模式對(duì)于多個(gè)線程獲取都可以成功。

同步器是實(shí)現(xiàn)鎖的關(guān)鍵,利用同步器將鎖的語義實(shí)現(xiàn),然后在鎖的實(shí)現(xiàn)中聚合同步器??梢赃@樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行為,而每個(gè)鎖需要完成特定的操作也是透過這些行為來完成的(比如:可以允許兩個(gè)線程進(jìn)行加鎖,排除兩個(gè)以上的線程),但是實(shí)現(xiàn)是依托給同步器來完成;同步器面向的是線程訪問和資源控制,它定義了線程對(duì)資源是否能夠獲取以及線程的排隊(duì)等操作。鎖和同步器很好的隔離了二者所需要關(guān)注的領(lǐng)域,嚴(yán)格意義上講,同步器可以適用于除了鎖以外的其他同步設(shè)施上(包括鎖)。

同步器的開始提到了其實(shí)現(xiàn)依賴于一個(gè)FIFO隊(duì)列,那么隊(duì)列中的元素Node就是保存著線程引用和線程狀態(tài)的容器,每個(gè)線程對(duì)同步器的訪問,都可以看做是隊(duì)列中的一個(gè)節(jié)點(diǎn)。Node的主要包含以下成員變量:

Node {
    int waitStatus;
    Node prev;
    Node next;
    Node nextWaiter;
    Thread thread;
}

以上五個(gè)成員變量主要負(fù)責(zé)保存該節(jié)點(diǎn)的線程引用,同步等待隊(duì)列(以下簡稱sync隊(duì)列)的前驅(qū)和后繼節(jié)點(diǎn),同時(shí)也包括了同步狀態(tài)。

屬性名稱

描述

int waitStatus

表示節(jié)點(diǎn)的狀態(tài)。其中包含的狀態(tài)有:

CANCELLED,值為1,表示當(dāng)前的線程被取消;

SIGNAL,值為-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行,也就是unpark;

CONDITION,值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中;

PROPAGATE,值為-3,表示當(dāng)前場景下后續(xù)的acquireShared能夠得以執(zhí)行;

值為0,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖。

Node prev

前驅(qū)節(jié)點(diǎn),比如當(dāng)前節(jié)點(diǎn)被取消,那就需要前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)來完成連接。

Node next

后繼節(jié)點(diǎn)。

Node nextWaiter

存儲(chǔ)condition隊(duì)列中的后繼節(jié)點(diǎn)。

Thread thread

入隊(duì)列時(shí)的當(dāng)前線程。

節(jié)點(diǎn)成為sync隊(duì)列和condition隊(duì)列構(gòu)建的基礎(chǔ),在同步器中就包含了sync隊(duì)列。同步器擁有三個(gè)成員變量:sync隊(duì)列的頭結(jié)點(diǎn)head、sync隊(duì)列的尾節(jié)點(diǎn)tail和狀態(tài)state。對(duì)于鎖的獲取,請(qǐng)求形成節(jié)點(diǎn),將其掛載在尾部,而鎖資源的轉(zhuǎn)移(釋放再獲取)是從頭部開始向后進(jìn)行。對(duì)于同步器維護(hù)的狀態(tài)state,多個(gè)線程對(duì)其的獲取將會(huì)產(chǎn)生一個(gè)鏈?zhǔn)降慕Y(jié)構(gòu)。

API說明

實(shí)現(xiàn)自定義同步器時(shí),需要使用同步器提供的getState()、setState()和compareAndSetState()方法來操縱狀態(tài)的變遷。

protected boolean tryAcquire(int arg)

排它的獲取這個(gè)狀態(tài)。這個(gè)方法的實(shí)現(xiàn)需要查詢當(dāng)前狀態(tài)是否允許獲取,然后再進(jìn)行獲?。ㄊ褂胏ompareAndSetState來做)狀態(tài)。

protected boolean tryRelease(int arg)?

釋放狀態(tài)。

protected int tryAcquireShared(int arg)

共享的模式下獲取狀態(tài)。

protected boolean tryReleaseShared(int arg)

共享的模式下釋放狀態(tài)。

protected boolean isHeldExclusively()

在排它模式下,狀態(tài)是否被占用。

實(shí)現(xiàn)這些方法必須是非阻塞而且是線程安全的,推薦使用該同步器的父類java.util.concurrent.locks.AbstractOwnableSynchronizer來設(shè)置當(dāng)前的線程。

開始提到同步器內(nèi)部基于一個(gè)FIFO隊(duì)列,對(duì)于一個(gè)獨(dú)占鎖的獲取和釋放有以下偽碼可以表示。

獲取一個(gè)排他鎖。

while(獲取鎖) {
    if (獲取到) {
        退出while循環(huán)
    } else {
        if(當(dāng)前線程沒有入隊(duì)列) {
            那么入隊(duì)列
        }
        阻塞當(dāng)前線程
    }
}

釋放一個(gè)排他鎖。

if (釋放成功) {
    刪除頭結(jié)點(diǎn)
    激活原頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)
}

示例

下面通過一個(gè)排它鎖的例子來深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能夠更加深入了解其他的并發(fā)組件。

排他鎖的實(shí)現(xiàn),一次只能一個(gè)線程獲取到鎖。

class Mutex implements Lock, java.io.Serializable {
   // 內(nèi)部類,自定義同步器
   private static class Sync extends AbstractQueuedSynchronizer {
     // 是否處于占用狀態(tài)
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }
     // 當(dāng)狀態(tài)為0的時(shí)候獲取鎖
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }
     // 釋放鎖,將狀態(tài)設(shè)置為0
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }
     // 返回一個(gè)Condition,每個(gè)condition都包含了一個(gè)condition隊(duì)列
     Condition newCondition() { return new ConditionObject(); }
   }
   // 僅需要將操作代理到Sync上即可
   private final Sync sync = new Sync();
   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

可以看到Mutex將Lock接口均代理給了同步器的實(shí)現(xiàn)。

使用方將Mutex構(gòu)造出來之后,調(diào)用lock獲取鎖,調(diào)用unlock進(jìn)行解鎖。下面以Mutex為例子,詳細(xì)分析以下同步器的實(shí)現(xiàn)邏輯。

實(shí)現(xiàn)分析

public final void acquire(int arg)

該方法以排他的方式獲取鎖,對(duì)中斷不敏感,完成synchronized語義。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

上述邏輯主要包括:

嘗試獲取(調(diào)用tryAcquire更改狀態(tài),需要保證原子性);

tryAcquire方法中使用了同步器提供的對(duì)state操作的方法,利用compareAndSet保證只有一個(gè)線程能夠?qū)顟B(tài)進(jìn)行成功修改,而沒有成功修改的線程將進(jìn)入sync隊(duì)列排隊(duì)。

如果獲取不到,將當(dāng)前線程構(gòu)造成節(jié)點(diǎn)Node并加入sync隊(duì)列;

進(jìn)入隊(duì)列的每個(gè)線程都是一個(gè)節(jié)點(diǎn)Node,從而形成了一個(gè)雙向隊(duì)列,類似CLH隊(duì)列,這樣做的目的是線程間的通信會(huì)被限制在較小規(guī)模(也就是兩個(gè)節(jié)點(diǎn)左右)。

再次嘗試獲取,如果沒有獲取到那么將當(dāng)前線程從線程調(diào)度器上摘下,進(jìn)入等待狀態(tài)。

使用LockSupport將當(dāng)前線程unpark,關(guān)于LockSupport后續(xù)會(huì)詳細(xì)介紹。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 快速嘗試在尾部添加
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
            t.next = node;
            return t;
        }
    }
}

上述邏輯主要包括:

使用當(dāng)前線程構(gòu)造Node;

對(duì)于一個(gè)節(jié)點(diǎn)需要做的是將當(dāng)節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)指向尾節(jié)點(diǎn)(current.prev = tail),尾節(jié)點(diǎn)指向它(tail = current),原有的尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節(jié)點(diǎn)的設(shè)置來保證的,也就是compareAndSetTail來完成的。

先行嘗試在隊(duì)尾添加;

如果尾節(jié)點(diǎn)已經(jīng)有了,然后做如下操作:

分配引用T指向尾節(jié)點(diǎn);

將節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)更新為尾節(jié)點(diǎn)(current.prev = tail);

如果尾節(jié)點(diǎn)是T,那么將當(dāng)尾節(jié)點(diǎn)設(shè)置為該節(jié)點(diǎn)(tail = current,原子更新);

T的后繼節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)(T.next = current)。

注意第3點(diǎn)是要求原子的。

這樣可以以最短路徑O(1)的效果來完成線程入隊(duì),是最大化減少開銷的一種方式。

如果隊(duì)尾添加失敗或者是第一個(gè)入隊(duì)的節(jié)點(diǎn)。

如果是第1個(gè)節(jié)點(diǎn),也就是sync隊(duì)列沒有初始化,那么會(huì)進(jìn)入到enq這個(gè)方法,進(jìn)入的線程可能有多個(gè),或者說在addWaiter中沒有成功入隊(duì)的線程都將進(jìn)入enq這個(gè)方法。

可以看到enq的邏輯是確保進(jìn)入的Node都會(huì)有機(jī)會(huì)順序的添加到sync隊(duì)列中,而加入的步驟如下:

如果尾節(jié)點(diǎn)為空,那么原子化的分配一個(gè)頭節(jié)點(diǎn),并將尾節(jié)點(diǎn)指向頭節(jié)點(diǎn),這一步是初始化;

然后是重復(fù)在addWaiter中做的工作,但是在一個(gè)while(true)的循環(huán)中,直到當(dāng)前節(jié)點(diǎn)入隊(duì)為止。

進(jìn)入sync隊(duì)列之后,接下來就是要進(jìn)行鎖的獲取,或者說是訪問控制了,只有一個(gè)線程能夠在同一時(shí)刻繼續(xù)的運(yùn)行,而其他的進(jìn)入等待狀態(tài)。而每個(gè)線程都是一個(gè)獨(dú)立的個(gè)體,它們自省的觀察,當(dāng)條件滿足的時(shí)候(自己的前驅(qū)是頭結(jié)點(diǎn)并且原子性的獲取了狀態(tài)),那么這個(gè)線程能夠繼續(xù)運(yùn)行。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
                }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:

獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn);

需要獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),而頭結(jié)點(diǎn)所對(duì)應(yīng)的含義是當(dāng)前站有鎖且正在運(yùn)行。

當(dāng)前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且能夠獲取狀態(tài),代表該當(dāng)前節(jié)點(diǎn)占有鎖;

如果滿足上述條件,那么代表能夠占有鎖,根據(jù)節(jié)點(diǎn)對(duì)鎖占有的含義,設(shè)置頭結(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)。

否則進(jìn)入等待狀態(tài)。

如果沒有輪到當(dāng)前節(jié)點(diǎn)運(yùn)行,那么將當(dāng)前線程從線程調(diào)度器上摘下,也就是進(jìn)入等待狀態(tài)。

這里針對(duì)acquire做一下總結(jié):

狀態(tài)的維護(hù);

需要在鎖定時(shí),需要維護(hù)一個(gè)狀態(tài)(int類型),而對(duì)狀態(tài)的操作是原子和非阻塞的,通過同步器提供的對(duì)狀態(tài)訪問的方法對(duì)狀態(tài)進(jìn)行操縱,并且利用compareAndSet來確保原子性的修改。

狀態(tài)的獲取;

一旦成功的修改了狀態(tài),當(dāng)前線程或者說節(jié)點(diǎn),就被設(shè)置為頭節(jié)點(diǎn)。

sync隊(duì)列的維護(hù)。

在獲取資源未果的過程中條件不符合的情況下(不該自己,前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)或者沒有獲取到資源)進(jìn)入睡眠狀態(tài),停止線程調(diào)度器對(duì)當(dāng)前節(jié)點(diǎn)線程的調(diào)度。

這時(shí)引入的一個(gè)釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關(guān)鍵,就是前驅(qū)節(jié)點(diǎn)的通知,而這一個(gè)過程就是釋放,釋放會(huì)通知它的后繼節(jié)點(diǎn)從睡眠中返回準(zhǔn)備運(yùn)行。

下面的流程圖基本描述了一次acquire所需要經(jīng)歷的過程:

如上圖所示,其中的判定退出隊(duì)列的條件,判定條件是否滿足和休眠當(dāng)前線程就是完成了自旋spin的過程。

public final boolean release(int arg)

在unlock方法的實(shí)現(xiàn)中,使用了同步器的release方法。相對(duì)于在之前的acquire方法中可以得出調(diào)用acquire,保證能夠獲取到鎖(成功獲取狀態(tài)),而release則表示將狀態(tài)設(shè)置回去,也就是將資源釋放,或者說將鎖釋放。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

上述邏輯主要包括:

嘗試釋放狀態(tài);

tryRelease能夠保證原子化的將狀態(tài)設(shè)置回去,當(dāng)然需要使用compareAndSet來保證。如果釋放狀態(tài)成功過之后,將會(huì)進(jìn)入后繼節(jié)點(diǎn)的喚醒過程。

喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)所包含的線程。

通過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續(xù)acquire狀態(tài)。

private void unparkSuccessor(Node node) {
    // 將狀態(tài)設(shè)置為同步狀態(tài)
    int ws = node.waitStatus;
    if (ws < 0)      compareAndSetWaitStatus(node, ws, 0);   // 獲取當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),如果滿足狀態(tài),那么進(jìn)行喚醒操作  // 如果沒有滿足狀態(tài),從尾部開始找尋符合要求的節(jié)點(diǎn)并將其喚醒     Node s = node.next;     if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
        }
    if (s != null)
        LockSupport.unpark(s.thread);
}

上述邏輯主要包括,該方法取出了當(dāng)前節(jié)點(diǎn)的next引用,然后對(duì)其線程(Node)進(jìn)行了喚醒,這時(shí)就只有一個(gè)或合理個(gè)數(shù)的線程被喚醒,被喚醒的線程繼續(xù)進(jìn)行對(duì)資源的獲取與爭奪。

回顧整個(gè)資源的獲取和釋放過程:

在獲取時(shí),維護(hù)了一個(gè)sync隊(duì)列,每個(gè)節(jié)點(diǎn)都是一個(gè)線程在進(jìn)行自旋,而依據(jù)就是自己是否是首節(jié)點(diǎn)的后繼并且能夠獲取資源; 在釋放時(shí),僅僅需要將資源還回去,然后通知一下后繼節(jié)點(diǎn)并將其喚醒。

這里需要注意,隊(duì)列的維護(hù)(首節(jié)點(diǎn)的更換)是依靠消費(fèi)者(獲取時(shí))來完成的,也就是說在滿足了自旋退出的條件時(shí)的一刻,這個(gè)節(jié)點(diǎn)就會(huì)被設(shè)置成為首節(jié)點(diǎn)。

protected boolean tryAcquire(int arg)

tryAcquire是自定義同步器需要實(shí)現(xiàn)的方法,也就是自定義同步器非阻塞原子化的獲取狀態(tài),如果鎖該方法一般用于Lock的tryLock實(shí)現(xiàn)中,這個(gè)特性是synchronized無法提供的。

public final void acquireInterruptibly(int arg)

該方法提供獲取狀態(tài)能力,當(dāng)然在無法獲取狀態(tài)的情況下會(huì)進(jìn)入sync隊(duì)列進(jìn)行排隊(duì),這類似acquire,但是和acquire不同的地方在于它能夠在外界對(duì)當(dāng)前線程進(jìn)行中斷的時(shí)候提前結(jié)束獲取狀態(tài)的操作,換句話說,就是在類似synchronized獲取鎖時(shí),外界能夠?qū)Ξ?dāng)前線程進(jìn)行中斷,并且獲取鎖的這個(gè)操作能夠響應(yīng)中斷并提前返回。一個(gè)線程處于synchronized塊中或者進(jìn)行同步I/O操作時(shí),對(duì)該線程進(jìn)行中斷操作,這時(shí)該線程的中斷標(biāo)識(shí)位被設(shè)置為true,但是線程依舊繼續(xù)運(yùn)行。

如果在獲取一個(gè)通過網(wǎng)絡(luò)交互實(shí)現(xiàn)的鎖時(shí),這個(gè)鎖資源突然進(jìn)行了銷毀,那么使用acquireInterruptibly的獲取方式就能夠讓該時(shí)刻嘗試獲取鎖的線程提前返回。而同步器的這個(gè)特性被實(shí)現(xiàn)Lock接口中的lockInterruptibly方法。根據(jù)Lock的語義,在被中斷時(shí),lockInterruptibly將會(huì)拋出InterruptedException來告知使用者。

public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 檢測中斷標(biāo)志位
            if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:

檢測當(dāng)前線程是否被中斷;

判斷當(dāng)前線程的中斷標(biāo)志位,如果已經(jīng)被中斷了,那么直接拋出異常并將中斷標(biāo)志位設(shè)置為false。

嘗試獲取狀態(tài);

調(diào)用tryAcquire獲取狀態(tài),如果順利會(huì)獲取成功并返回。

構(gòu)造節(jié)點(diǎn)并加入sync隊(duì)列;

獲取狀態(tài)失敗后,將當(dāng)前線程引用構(gòu)造為節(jié)點(diǎn)并加入到sync隊(duì)列中。退出隊(duì)列的方式在沒有中斷的場景下和acquireQueued類似,當(dāng)頭結(jié)點(diǎn)是自己的前驅(qū)節(jié)點(diǎn)并且能夠獲取到狀態(tài)時(shí),即可以運(yùn)行,當(dāng)然要將本節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),表示正在運(yùn)行。

中斷檢測。

在每次被喚醒時(shí),進(jìn)行中斷檢測,如果發(fā)現(xiàn)當(dāng)前線程被中斷,那么拋出InterruptedException并退出循環(huán)。

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException

該方法提供了具備有超時(shí)功能的獲取狀態(tài)的調(diào)用,如果在指定的nanosTimeout內(nèi)沒有獲取到狀態(tài),那么返回false,反之返回true??梢詫⒃摲椒醋鯽cquireInterruptibly的升級(jí)版,也就是在判斷是否被中斷的基礎(chǔ)上增加了超時(shí)控制。

針對(duì)超時(shí)控制這部分的實(shí)現(xiàn),主要需要計(jì)算出睡眠的delta,也就是間隔值。間隔可以表示為nanosTimeout = 原有nanosTimeout – now(當(dāng)前時(shí)間)+
lastTime(睡眠之前記錄的時(shí)間)。如果nanosTimeout大于0,那么還需要使當(dāng)前線程睡眠,反之則返回false。

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
    long lastTime = System.nanoTime();
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            if (nanosTimeout <= 0)               return false;           if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            //計(jì)算時(shí)間,當(dāng)前時(shí)間減去睡眠之前的時(shí)間得到睡眠的時(shí)間,然后被
            //原有超時(shí)時(shí)間減去,得到了還應(yīng)該睡眠的時(shí)間
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:

加入sync隊(duì)列;

將當(dāng)前線程構(gòu)造成為節(jié)點(diǎn)Node加入到sync隊(duì)列中。

條件滿足直接返回;

退出條件判斷,如果前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲取到狀態(tài),那么設(shè)置自己為頭結(jié)點(diǎn)并退出,返回true,也就是在指定的nanosTimeout之前獲取了鎖。

獲取狀態(tài)失敗休眠一段時(shí)間;

通過LockSupport.unpark來指定當(dāng)前線程休眠一段時(shí)間。

計(jì)算再次休眠的時(shí)間;

喚醒后的線程,計(jì)算仍需要休眠的時(shí)間,該時(shí)間表示為nanosTimeout = 原有nanosTimeout – now(當(dāng)前時(shí)間)+ lastTime(睡眠之前記錄的時(shí)間)。其中now
– lastTime表示這次睡眠所持續(xù)的時(shí)間。

休眠時(shí)間的判定。

喚醒后的線程,計(jì)算仍需要休眠的時(shí)間,并無阻塞的嘗試再獲取狀態(tài),如果失敗后查看其nanosTimeout是否大于0,如果小于0,那么返回完全超時(shí),沒有獲取到鎖。 如果nanosTimeout小于等于1000L納秒,則進(jìn)入快速的自旋過程。那么快速自旋會(huì)造成處理器資源緊張嗎?結(jié)果是不會(huì),經(jīng)過測算,開銷看起來很小,幾乎微乎其微。Doug Lea應(yīng)該測算了在線程調(diào)度器上的切換造成的額外開銷,因此在短時(shí)1000納秒內(nèi)就讓當(dāng)前線程進(jìn)入快速自旋狀態(tài),如果這時(shí)再休眠相反會(huì)讓nanosTimeout的獲取時(shí)間變得更加不精確。

上述過程可以如下圖所示:

上述這個(gè)圖中可以理解為在類似獲取狀態(tài)需要排隊(duì)的基礎(chǔ)上增加了一個(gè)超時(shí)控制的邏輯。每次超時(shí)的時(shí)間就是當(dāng)前超時(shí)剩余的時(shí)間減去睡眠的時(shí)間,而在這個(gè)超時(shí)時(shí)間的基礎(chǔ)上進(jìn)行了判斷,如果大于0那么繼續(xù)睡眠(等待),可以看出這個(gè)超時(shí)版本的獲取狀態(tài)只是一個(gè)近似超時(shí)的獲取狀態(tài),因此任何含有超時(shí)的調(diào)用基本結(jié)果就是近似于給定超時(shí)。

public final void acquireShared(int arg)

調(diào)用該方法能夠以共享模式獲取狀態(tài),共享模式和之前的獨(dú)占模式有所區(qū)別。以文件的查看為例,如果一個(gè)程序在對(duì)其進(jìn)行讀取操作,那么這一時(shí)刻,對(duì)這個(gè)文件的寫操作就被阻塞,相反,這一時(shí)刻另一個(gè)程序?qū)ζ溥M(jìn)行同樣的讀操作是可以進(jìn)行的。如果一個(gè)程序在對(duì)其進(jìn)行寫操作,那么所有的讀與寫操作在這一時(shí)刻就被阻塞,直到這個(gè)程序完成寫操作。

以讀寫場景為例,描述共享和獨(dú)占的訪問模式,如下圖所示:

上圖中,紅色代表被阻塞,綠色代表可以通過。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)   doAcquireShared(arg); } private void doAcquireShared(int arg) {     final Node node = addWaiter(Node.SHARED);   boolean failed = true;  try {       boolean interrupted = false;        for (;;) {          final Node p = node.predecessor();          if (p == head) {                int r = tryAcquireShared(arg);              if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
            interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上述邏輯主要包括:

嘗試獲取共享狀態(tài);

調(diào)用tryAcquireShared來獲取共享狀態(tài),該方法是非阻塞的,如果獲取成功則立刻返回,也就表示獲取共享鎖成功。

獲取失敗進(jìn)入sync隊(duì)列;

在獲取共享狀態(tài)失敗后,當(dāng)前時(shí)刻有可能是獨(dú)占鎖被其他線程所把持,那么將當(dāng)前線程構(gòu)造成為節(jié)點(diǎn)(共享模式)加入到sync隊(duì)列中。

循環(huán)內(nèi)判斷退出隊(duì)列條件;

如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且獲取共享狀態(tài)成功,這里和獨(dú)占鎖acquire的退出隊(duì)列條件類似。

獲取共享狀態(tài)成功;

在退出隊(duì)列的條件上,和獨(dú)占鎖之間的主要區(qū)別在于獲取共享狀態(tài)成功之后的行為,而如果共享狀態(tài)獲取成功之后會(huì)判斷后繼節(jié)點(diǎn)是否是共享模式,如果是共享模式,那么就直接對(duì)其進(jìn)行喚醒操作,也就是同時(shí)激發(fā)多個(gè)線程并發(fā)的運(yùn)行。

獲取共享狀態(tài)失敗。

通過使用LockSupport將當(dāng)前線程從線程調(diào)度器上摘下,進(jìn)入休眠狀態(tài)。

對(duì)于上述邏輯中,節(jié)點(diǎn)之間的通知過程如下圖所示:

上圖中,綠色表示共享節(jié)點(diǎn),它們之間的通知和喚醒操作是在前驅(qū)節(jié)點(diǎn)獲取狀態(tài)時(shí)就進(jìn)行的,紅色表示獨(dú)占節(jié)點(diǎn),它的被喚醒必須取決于前驅(qū)節(jié)點(diǎn)的釋放,也就是release操作,可以看出來圖中的獨(dú)占節(jié)點(diǎn)如果要運(yùn)行,必須等待前面的共享節(jié)點(diǎn)均釋放了狀態(tài)才可以。而獨(dú)占節(jié)點(diǎn)如果獲取了狀態(tài),那么后續(xù)的獨(dú)占式獲取和共享式獲取均被阻塞。

public final boolean releaseShared(int arg)

調(diào)用該方法釋放共享狀態(tài),每次獲取共享狀態(tài)acquireShared都會(huì)操作狀態(tài),同樣在共享鎖釋放的時(shí)候,也需要將狀態(tài)釋放。比如說,一個(gè)限定一定數(shù)量訪問的同步工具,每次獲取都是共享的,但是如果超過了一定的數(shù)量,將會(huì)阻塞后續(xù)的獲取操作,只有當(dāng)之前獲取的消費(fèi)者將狀態(tài)釋放才可以使阻塞的獲取操作得以運(yùn)行。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

上述邏輯主要就是調(diào)用同步器的tryReleaseShared方法來釋放狀態(tài),并同時(shí)在doReleaseShared方法中喚醒其后繼節(jié)點(diǎn)。

一個(gè)例子

在上述對(duì)同步器AbstractQueuedSynchronizer進(jìn)行了實(shí)現(xiàn)層面的分析之后,我們通過一個(gè)例子來加深對(duì)同步器的理解:

設(shè)計(jì)一個(gè)同步工具,該工具在同一時(shí)刻,只能有兩個(gè)線程能夠并行訪問,超過限制的其他線程進(jìn)入阻塞狀態(tài)。

對(duì)于這個(gè)需求,可以利用同步器完成一個(gè)這樣的設(shè)定,定義一個(gè)初始狀態(tài),為2,一個(gè)線程進(jìn)行獲取那么減1,一個(gè)線程釋放那么加1,狀態(tài)正確的范圍在[0,1,2]三個(gè)之間,當(dāng)在0時(shí),代表再有新的線程對(duì)資源進(jìn)行獲取時(shí)只能進(jìn)入阻塞狀態(tài)(注意在任何時(shí)候進(jìn)行狀態(tài)變更的時(shí)候均需要以CAS作為原子性保障)。

public class TwinsLock implements Lock {
    private static final Sync sync = new Sync();
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7889272986162341211L;
        {
            setState(2);
        }

        protected boolean tryAcquire(int arg) {
            if (arg != 1) {
                return false;
            }
            int currentStats = getState();
            if (currentStats <= 0) {
                return false;
            }
            if (compareAndSetState(currentStats, currentStats - 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int arg) {
            if (arg != 1) {
                return false;
            }
            for(;;) {
                int currentStats = getState();
                if (compareAndSetState(currentStats, currentStats + 1)) {
                setExclusiveOwnerThread(null);
                return true;
                }
            }
        }

        protected boolean isHeldExclusively() {
                return getState() < 2;
        }
        }

    public void lock() {
        sync.acquire(1);
        }

        public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
        }

    public boolean tryLock() {
        return sync.tryAcquire(1);
        }

        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
        }

        public void unlock() {
        sync.release(1);
        }
}

這里我們編寫一個(gè)測試來驗(yàn)證TwinsLock是否能夠正常工作并達(dá)到預(yù)期。

public class TwinsLockTest {

    @Test
    public void test() {
        final Lock lock = new TwinsLock();

        class Worker extends Thread {
            public void run() {
                while (true) {
                    lock.lock();

                    try {
                        Thread.sleep(1000L);
                System.out.println(Thread.currentThread());
                        Thread.sleep(1000L);
                    } catch (Exception ex) {

                    } finally {
                        lock.unlock();
                    }
                }
            }
        }

        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.start();
        }

        new Thread() {
            public void run() {
                while (true) {

                    try {
                        Thread.sleep(200L);
                        System.out.println();
                    } catch (Exception ex) {

                    }
                }
            }
        }.start();

        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述測試用例的邏輯主要包括:

打印線程

Worker在兩次睡眠之間打印自身線程,如果一個(gè)時(shí)刻只能有兩個(gè)線程同時(shí)訪問,那么打印出來的內(nèi)容將是成對(duì)出現(xiàn)。

分隔線程

不停的打印換行,能讓W(xué)orker的輸出看起來更加直觀。

該測試的結(jié)果是在一個(gè)時(shí)刻,僅有兩個(gè)線程能夠獲得到鎖,并完成打印,而表象就是打印的內(nèi)容成對(duì)出現(xiàn)。

by 魏鵬 via ifeve

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/64024.html

相關(guān)文章

  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...

    supernavy 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...

    ddongjian0000 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...

    wangdai 評(píng)論0 收藏0
  • Java 重入鎖 ReentrantLock 原理分析

    摘要:的主要功能和關(guān)鍵字一致,均是用于多線程的同步。而僅支持通過查詢當(dāng)前線程是否持有鎖。由于和使用的是同一把可重入鎖,所以線程可以進(jìn)入方法,并再次獲得鎖,而不會(huì)被阻塞住。公平與非公平公平與非公平指的是線程獲取鎖的方式。 1.簡介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關(guān)鍵字類似。所謂的可重入是指,線程可對(duì)同一把鎖進(jìn)行重復(fù)加鎖,而不會(huì)被阻...

    lx1036 評(píng)論0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - 獨(dú)占/共享模式

    摘要:簡介抽象隊(duì)列同步器,以下簡稱出現(xiàn)在中,由大師所創(chuàng)作。獲取成功則返回,獲取失敗,線程進(jìn)入同步隊(duì)列等待。響應(yīng)中斷版的超時(shí)響應(yīng)中斷版的共享式獲取同步狀態(tài),同一時(shí)刻可能會(huì)有多個(gè)線程獲得同步狀態(tài)。 1.簡介 AbstractQueuedSynchronizer (抽象隊(duì)列同步器,以下簡稱 AQS)出現(xiàn)在 JDK 1.5 中,由大師 Doug Lea 所創(chuàng)作。AQS 是很多同步器的基礎(chǔ)框架,比如 ...

    pf_miles 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<