摘要:在章節(jié)中,我們說(shuō)過(guò),維護(hù)了一把全局鎖,無(wú)論是出隊(duì)還是入隊(duì),都共用這把鎖,這就導(dǎo)致任一時(shí)間點(diǎn)只有一個(gè)線程能夠執(zhí)行。入隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,出隊(duì)鎖對(duì)應(yīng)的是條件隊(duì)列,所以每入隊(duì)一個(gè)元素,應(yīng)當(dāng)立即去喚醒可能阻塞的其它入隊(duì)線程。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、LinkedBlockingQueue簡(jiǎn)介
LinkedBlockingQueue是在JDK1.5時(shí),隨著J.U.C包引入的一種阻塞隊(duì)列,它實(shí)現(xiàn)了BlockingQueue接口,底層基于單鏈表實(shí)現(xiàn):
LinkedBlockingQueue是一種近似有界阻塞隊(duì)列,為什么說(shuō)近似?因?yàn)長(zhǎng)inkedBlockingQueue既可以在初始構(gòu)造時(shí)就指定隊(duì)列的容量,也可以不指定,如果不指定,那么它的容量大小默認(rèn)為Integer.MAX_VALUE。
LinkedBlockingQueue除了底層數(shù)據(jù)結(jié)構(gòu)(單鏈表)與ArrayBlockingQueue不同外,另外一個(gè)特點(diǎn)就是:
它維護(hù)了兩把鎖——takeLock和putLock。
takeLock用于控制出隊(duì)的并發(fā),putLock用于入隊(duì)的并發(fā)。這也就意味著,同一時(shí)刻,只能只有一個(gè)線程能執(zhí)行入隊(duì)/出隊(duì)操作,其余入隊(duì)/出隊(duì)線程會(huì)被阻塞;但是,入隊(duì)和出隊(duì)之間可以并發(fā)執(zhí)行,即同一時(shí)刻,可以同時(shí)有一個(gè)線程進(jìn)行入隊(duì),另一個(gè)線程進(jìn)行出隊(duì),這樣就可以提升吞吐量。
在ArrayBlockingQueue章節(jié)中,我們說(shuō)過(guò),ArrayBlockingQueue維護(hù)了一把全局鎖,無(wú)論是出隊(duì)還是入隊(duì),都共用這把鎖,這就導(dǎo)致任一時(shí)間點(diǎn)只有一個(gè)線程能夠執(zhí)行。那么對(duì)于“生產(chǎn)者-消費(fèi)者”模式來(lái)說(shuō),意味著生產(chǎn)者和消費(fèi)者不能并發(fā)執(zhí)行。二、LinkedBlockingQueue原理 構(gòu)造
LinkedBlockingQueue提供了三種構(gòu)造器:
/** * 默認(rèn)構(gòu)造器. * 隊(duì)列容量為Integer.MAX_VALUE. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
/** * 顯示指定隊(duì)列容量的構(gòu)造器 */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); }
/** * 從已有集合構(gòu)造隊(duì)列. * 隊(duì)列容量為Integer.MAX_VALUE */ public LinkedBlockingQueue(Collection extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // 這里加鎖僅僅是為了保證可見性 try { int n = 0; for (E e : c) { if (e == null) // 隊(duì)列不能包含null元素 throw new NullPointerException(); if (n == capacity) // 隊(duì)列已滿 throw new IllegalStateException("Queue full"); enqueue(new Node(e)); // 隊(duì)尾插入元素 ++n; } count.set(n); // 設(shè)置元素個(gè)數(shù) } finally { putLock.unlock(); } }
可以看到,如果不指定容量,那么它的容量大小默認(rèn)為Integer.MAX_VALUE。另外,LinkedBlockingQueue使用了一個(gè)原子變量AtomicInteger記錄隊(duì)列中元素的個(gè)數(shù),以保證入隊(duì)/出隊(duì)并發(fā)修改元素時(shí)的數(shù)據(jù)一致性。
public class LinkedBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { /** * 隊(duì)列容量. * 如果不指定, 則為Integer.MAX_VALUE */ private final int capacity; /** * 隊(duì)列中的元素個(gè)數(shù) */ private final AtomicInteger count = new AtomicInteger(); /** * 隊(duì)首指針. * head.item == null */ transient Node head; /** * 隊(duì)尾指針. * last.next == null */ private transient Node last; /** * 出隊(duì)鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** * 隊(duì)列空時(shí),出隊(duì)線程在該條件隊(duì)列等待 */ private final Condition notEmpty = takeLock.newCondition(); /** * 入隊(duì)鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** * 隊(duì)列滿時(shí),入隊(duì)線程在該條件隊(duì)列等待 */ private final Condition notFull = putLock.newCondition(); /** * 鏈表結(jié)點(diǎn)定義 */ static class Node { E item; Node next; // 后驅(qū)指針 Node(E x) { item = x; } } //... }
構(gòu)造完成后,LinkedBlockingQueue的初始結(jié)構(gòu)如下:
插入部分元素后的LinkedBlockingQueue結(jié)構(gòu):
核心方法由于接口和ArrayBlockingQueue完全一樣,所以LinkedBlockingQueue會(huì)阻塞線程的方法也一共有4個(gè):put(E e)、offer(e, time, unit)和take()、poll(time, unit),我們先來(lái)看插入元素的方法。
插入元素——put(E e)
/** * 在隊(duì)尾插入指定的元素. * 如果隊(duì)列已滿,則阻塞線程. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 獲取“入隊(duì)鎖” try { while (count.get() == capacity) { // 隊(duì)列已滿, 則線程在notFull上等待 notFull.await(); } enqueue(node); // 將新結(jié)點(diǎn)鏈接到“隊(duì)尾” /** * c+1 表示的元素個(gè)數(shù). * 如果,則喚醒一個(gè)“入隊(duì)線程” */ c = count.getAndIncrement(); // c表示入隊(duì)前的隊(duì)列元素個(gè)數(shù) if (c + 1 < capacity) // 入隊(duì)后隊(duì)列未滿, 則喚醒一個(gè)“入隊(duì)線程” notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 隊(duì)列初始為空, 則喚醒一個(gè)“出隊(duì)線程” signalNotEmpty(); }
插入元素時(shí),首先需要獲得“入隊(duì)鎖”,如果隊(duì)列滿了,則當(dāng)前線程需要在notFull條件隊(duì)列等待;否則,將新元素鏈接到隊(duì)列尾部。
這里需要注意的是兩個(gè)地方:
①每入隊(duì)一個(gè)元素后,如果隊(duì)列還沒滿,則需要喚醒其它可能正在等待的“入隊(duì)線程”:
/** * c+1 表示的元素個(gè)數(shù). * 如果,則喚醒一個(gè)“入隊(duì)線程” */ c = count.getAndIncrement(); // c表示入隊(duì)前的隊(duì)列元素個(gè)數(shù) if (c + 1 < capacity) // 入隊(duì)后隊(duì)列未滿, 則喚醒一個(gè)“入隊(duì)線程” notFull.signal();
② 每入隊(duì)一個(gè)元素,都要判斷下隊(duì)列是否空了,如果空了,說(shuō)明可能存在正在等待的“出隊(duì)線程”,需要喚醒它:
if (c == 0) // 隊(duì)列為空, 則喚醒一個(gè)“出隊(duì)線程” signalNotEmpty();
這里為什么不像ArrayBlockingQueue那樣,入隊(duì)完成后,直接喚醒一個(gè)在notEmpty上等待的出隊(duì)線程?
因?yàn)锳rrayBlockingQueue中,入隊(duì)/出隊(duì)用的是同一把鎖,兩者不會(huì)并發(fā)執(zhí)行,所以每入隊(duì)一個(gè)元素(拿到鎖),就可以通知可能正在等待的“出隊(duì)線程”。(同一個(gè)鎖的兩個(gè)條件隊(duì)列:notEmpty、notFull)
ArrayBlockingQueue中的enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 隊(duì)列已滿,則重置索引為0 putIndex = 0; count++; // 元素個(gè)數(shù)+1 notEmpty.signal(); // 喚醒一個(gè)notEmpty上的等待線程(可以來(lái)隊(duì)列取元素了) }
而LinkedBlockingQueue中,入隊(duì)/出隊(duì)用的是兩把鎖,入隊(duì)/出隊(duì)是會(huì)并發(fā)執(zhí)行的。入隊(duì)鎖對(duì)應(yīng)的是notFull條件隊(duì)列,出隊(duì)鎖對(duì)應(yīng)的是notEmpty條件隊(duì)列,所以每入隊(duì)一個(gè)元素,應(yīng)當(dāng)立即去喚醒可能阻塞的其它入隊(duì)線程。當(dāng)隊(duì)列為空時(shí),說(shuō)明后面再來(lái)“出隊(duì)線程”,一定都會(huì)阻塞,所以此時(shí)可以去喚醒一個(gè)出隊(duì)線程,以提升性能。
試想以下,如果去掉上面的①和②,當(dāng)入隊(duì)線程拿到“入隊(duì)鎖”,入隊(duì)元素后,直接嘗試喚醒出隊(duì)線程,會(huì)要求去拿出隊(duì)鎖,這樣持有鎖A的同時(shí),再去嘗試獲取鎖B,很可能引起死鎖,就算通過(guò)打破死鎖的條件避免死鎖,每次操作同時(shí)獲取兩把鎖也會(huì)降低性能。
刪除元素——table()
刪除元素的邏輯和插入元素類似。刪除元素時(shí),首先需要獲得“出隊(duì)鎖”,如果隊(duì)列為空,則當(dāng)前線程需要在notEmpty條件隊(duì)列等待;否則,從隊(duì)首出隊(duì)一個(gè)元素:
/** * 從隊(duì)首出隊(duì)一個(gè)元素 */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 獲取“出隊(duì)鎖” takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊(duì)列為空, 則阻塞線程 notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); // c表示出隊(duì)前的元素個(gè)數(shù) if (c > 1) // 出隊(duì)前隊(duì)列非空, 則喚醒一個(gè)出隊(duì)線程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) // 隊(duì)列初始為滿,則喚醒一個(gè)入隊(duì)線程 signalNotFull(); return x; }
/** * 隊(duì)首出隊(duì)一個(gè)元素. */ private E dequeue() { Nodeh = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
上面需要的注意點(diǎn)和插入元素一樣:
①每出隊(duì)一個(gè)元素前,如果隊(duì)列非空,則需要喚醒其它可能正在等待的“出隊(duì)線程”:
c = count.getAndDecrement(); // c表示出隊(duì)前的元素個(gè)數(shù) if (c > 1) // 出隊(duì)前隊(duì)列非空, 則喚醒一個(gè)出隊(duì)線程 notEmpty.signal();
② 每入隊(duì)一個(gè)元素,都要判斷下隊(duì)列是否滿,如果是滿的,說(shuō)明可能存在正在等待的“入隊(duì)線程”,需要喚醒它:
if (c == capacity) // 隊(duì)列初始為滿,則喚醒一個(gè)入隊(duì)線程 signalNotFull();三、總結(jié)
歸納一下,LinkedBlockingQueue和ArrayBlockingQueue比較主要有以下區(qū)別:
隊(duì)列大小不同。ArrayBlockingQueue初始構(gòu)造時(shí)必須指定大小,而LinkedBlockingQueue構(gòu)造時(shí)既可以指定大小,也可以不指定(默認(rèn)為Integer.MAX_VALUE,近似于無(wú)界);
底層數(shù)據(jù)結(jié)構(gòu)不同。ArrayBlockingQueue底層采用數(shù)組作為數(shù)據(jù)存儲(chǔ)容器,而LinkedBlockingQueue底層采用單鏈表作為數(shù)據(jù)存儲(chǔ)容器;
兩者的加鎖機(jī)制不同。ArrayBlockingQueue使用一把全局鎖,即入隊(duì)和出隊(duì)使用同一個(gè)ReentrantLock鎖;而LinkedBlockingQueue進(jìn)行了鎖分離,入隊(duì)使用一個(gè)ReentrantLock鎖(putLock),出隊(duì)使用另一個(gè)ReentrantLock鎖(takeLock);
LinkedBlockingQueue不能指定公平/非公平策略(默認(rèn)都是非公平),而ArrayBlockingQueue可以指定策略。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/77045.html
摘要:接口截止目前為止,我們介紹的阻塞隊(duì)列都是實(shí)現(xiàn)了接口。該類在構(gòu)造時(shí)一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過(guò)來(lái)保證線程安全,所以的整體實(shí)現(xiàn)時(shí)比較簡(jiǎn)單的。另外,雙端隊(duì)列相比普通隊(duì)列,主要是多了隊(duì)尾出隊(duì)元素隊(duì)首入隊(duì)元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來(lái)看下的類繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過(guò)實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計(jì)的有序工具類。唯一的區(qū)別是針對(duì)的僅僅是鍵值,針對(duì)鍵值對(duì)進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個(gè)線程同時(shí)進(jìn)行寫操作時(shí),才會(huì)進(jìn)行同步。可以看到,上述方法返回一個(gè)迭代器對(duì)象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過(guò)程中不會(huì)拋出并發(fā)修改異常。另外,迭代器對(duì)象也不支持修改方法,全部會(huì)拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過(guò)了,底層基于跳表實(shí)現(xiàn),其操作平均時(shí)間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個(gè)對(duì)象,以組合方式,委托對(duì)象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對(duì)快照進(jìn)行的,不會(huì)拋出,且迭代過(guò)程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
閱讀 3414·2021-11-19 11:36
閱讀 2999·2021-09-27 13:34
閱讀 2058·2021-09-22 15:17
閱讀 2464·2019-08-30 13:49
閱讀 815·2019-08-26 13:58
閱讀 1418·2019-08-26 10:47
閱讀 2591·2019-08-23 18:05
閱讀 672·2019-08-23 14:25