摘要:里提供了多個用于控制多線程同步的同步原語,這些原語,包含在的標(biāo)準(zhǔn)庫當(dāng)中。例如總結(jié)多線程同步,說難也難,說不難也很容易,關(guān)鍵是要看你的業(yè)務(wù)場景和解決問題的思路,盡量降低多線程之間的依賴,理清楚業(yè)務(wù)流程,選擇合適的方法,則事盡成。
概述
多線程給我們帶來的好處是可以并發(fā)的執(zhí)行多個任務(wù),特別是對于I/O密集型的業(yè)務(wù),使用多線程,可以帶來成倍的性能增長。
可是當(dāng)我們多個線程需要修改同一個數(shù)據(jù),在不做任何同步控制的情況下,產(chǎn)生的結(jié)果往往是不可預(yù)料的,比如兩個線程,一個輸出hello,一個輸出world,實際運(yùn)行的結(jié)果,往往可能是一個是hello world,一個是world hello。
python里提供了多個用于控制多線程同步的同步原語,這些原語,包含在python的標(biāo)準(zhǔn)庫threading.py當(dāng)中。我今天簡單的介紹一下python里的這些控制多線程同步的原語,包括:Locks、RLocks、Semaphores、Events、Conditions和Barriers,你也可以繼承這些類,實現(xiàn)自己的同步控制原語。
Lock(鎖)Locks是python里最簡單的同步原語,只包括兩個狀態(tài):locked和unlocked,剛創(chuàng)建時狀態(tài)是unlocked。Locks有兩個方法,acquire和release。acquire方法加鎖,release方法釋放鎖,如果acquire枷鎖失敗,則阻塞,表明其他線程已經(jīng)加鎖。release方法只有當(dāng)狀態(tài)是locked調(diào)用方法True,如果是unlocked狀態(tài),調(diào)用release方法會拋出RunTimeError異常。例如代碼:
from threading import Lock, Thread lock = Lock() g = 0 def add_one(): """ Just used for demonstration. It’s bad to use the ‘global’ statement in general. """ global g lock.acquire() g += 1 lock.release() def add_two(): global g lock.acquire() g += 2 lock.release() threads = [] for func in [add_one, add_two]: threads.append(Thread(target=func)) threads[-1].start() for thread in threads: """ Waits for threads to complete before moving on with the main script. """ thread.join() print(g)
最終輸出的結(jié)果是3,通過Lock的使用,雖然在兩個線程中修改了同一個全局變量,但兩個線程是順序計算出結(jié)果的。
RLock(循環(huán)鎖)上面的Lock對象雖然能達(dá)到同步的效果,但是無法得知當(dāng)前是那個線程獲取到了鎖。如果鎖沒被釋放,則其他獲取這個鎖的線程都會被阻塞住。如果不想阻塞,可以使用RLock,例如:
# 使用Lock import threading num = 0 lock = Threading.Lock() lock.acquire() num += 1 lock.acquire() # 這個地方阻塞 num += 2 lock.release() # 使用RLock lock = Threading.RLock() lock.acquire() num += 3 lock.acquire() # 這不會阻塞 num += 4 lock.release() lock.release() # 這個地方注意是釋放兩次鎖Semaphores
Semaphores是個最簡單的計數(shù)器,有兩個方法acquire()和release(),如果有多個線程調(diào)用acquire()方法,acquire()方法會阻塞住,每當(dāng)調(diào)用次acquire方法,就做一次減1操作,每當(dāng)release()方法調(diào)用此次,就加1,如果最后的計數(shù)數(shù)值大于調(diào)用acquire()方法的線程數(shù)目,release()方法會拋出ValueError異常。下面是個生產(chǎn)者消費(fèi)者的示例。
import random, time from threading import BoundedSemaphore, Thread max_items = 5 container = BoundedSemaphore(max_items) def producer(nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) print(time.ctime(), end=": ") try: container.release() print("Produced an item.") except ValueError: print("Full, skipping.") def consumer(nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) print(time.ctime(), end=": ") if container.acquire(False): print("Consumed an item.") else: print("Empty, skipping.") threads = [] nloops = random.randrange(3, 6) print("Starting with %s items." % max_items) threads.append(Thread(target=producer, args=(nloops,))) threads.append(Thread(target=consumer, args=(random.randrange(nloops, nloops+max_items+2),))) for thread in threads: # Starts all the threads. thread.start() for thread in threads: # Waits for threads to complete before moving on with the main script. thread.join() print("All done.")
threading模塊還提供了一個Semaphore對象,它允許你可以任意次的調(diào)用release函數(shù),但是最好還是使用BoundedSemaphore對象,這樣在release調(diào)用次數(shù)過多時會報錯,有益于查找錯誤。Semaphores最長用來限制資源的使用,比如最多十個進(jìn)程。
Eventsevent可以充當(dāng)多進(jìn)程之間的通信工具,基于一個內(nèi)部的標(biāo)志,線程可以調(diào)用set()和clear()方法來操作這個標(biāo)志,其他線程則阻塞在wait()函數(shù),直到標(biāo)志被設(shè)置為True。下面的代碼展示了如何利用Events來追蹤行為。
import random, time from threading import Event, Thread event = Event() def waiter(event, nloops): for i in range(nloops): print(“%s. Waiting for the flag to be set.” % (i+1)) event.wait() # Blocks until the flag becomes true. print(“Wait complete at:”, time.ctime()) event.clear() # Resets the flag. print() def setter(event, nloops): for i in range(nloops): time.sleep(random.randrange(2, 5)) # Sleeps for some time. event.set() threads = [] nloops = random.randrange(3, 6) threads.append(Thread(target=waiter, args=(event, nloops))) threads[-1].start() threads.append(Thread(target=setter, args=(event, nloops))) threads[-1].start() for thread in threads: thread.join() print(“All done.”)Conditions
conditions是比events更加高級一點(diǎn)的同步原語,可以用戶多線程間的通信和通知。比如A線程通知B線程資源已經(jīng)可以被消費(fèi)。其他的線程必須在調(diào)用wait()方法前調(diào)用acquire()方法。同樣的,每個線程在資源使用完以后,要調(diào)用release()方法,這樣其他線程就可以繼續(xù)執(zhí)行了。下面是使用conditions實現(xiàn)的一個生產(chǎn)者消費(fèi)者的例子。
import random, time from threading import Condition, Thread condition = Condition() box = [] def producer(box, nitems): for i in range(nitems): time.sleep(random.randrange(2, 5)) # Sleeps for some time. condition.acquire() num = random.randint(1, 10) box.append(num) # Puts an item into box for consumption. condition.notify() # Notifies the consumer about the availability. print("Produced:", num) condition.release() def consumer(box, nitems): for i in range(nitems): condition.acquire() condition.wait() # Blocks until an item is available for consumption. print("%s: Acquired: %s" % (time.ctime(), box.pop())) condition.release() threads = [] nloops = random.randrange(3, 6) for func in [producer, consumer]: threads.append(Thread(target=func, args=(box, nloops))) threads[-1].start() # Starts the thread. for thread in threads: thread.join() print("All done.")
conditions還有其他很多用戶,比如實現(xiàn)一個數(shù)據(jù)流API,當(dāng)數(shù)據(jù)準(zhǔn)備好了可以通知其他線程去處理數(shù)據(jù)。
Barriersbarriers是個簡單的同步原語,可以用戶多個線程之間的相互等待。每個線程都調(diào)用wait()方法,然后阻塞,直到所有線程調(diào)用了wait(),然后所有線程同時開始運(yùn)行。例如:
from random import randrange from threading import Barrier, Thread from time import ctime, sleep num = 4 b = Barrier(num) names = [“Harsh”, “Lokesh”, “George”, “Iqbal”] def player(): name = names.pop() sleep(randrange(2, 5)) print(“%s reached the barrier at: %s” % (name, ctime())) b.wait() threads = [] print(“Race starts now…”) for i in range(num): threads.append(Thread(target=player)) threads[-1].start() for thread in threads: thread.join() print() print(“Race over!”)總結(jié)
多線程同步,說難也難,說不難也很容易,關(guān)鍵是要看你的業(yè)務(wù)場景和解決問題的思路,盡量降低多線程之間的依賴,理清楚業(yè)務(wù)流程,選擇合適的方法,則事盡成。
轉(zhuǎn)載自我的博客:捕蛇者說
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/40778.html
摘要:具有以下基本同步原語子進(jìn)程提供了通過創(chuàng)建和管理子進(jìn)程的。雖然隊列不是線程安全的,但它們被設(shè)計為專門用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語法編寫并發(fā)代碼的標(biāo)準(zhǔn)庫。通過上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個版本,asyncio又做了比較大的調(diào)整,把這個庫的API分為了 高層級API和...
摘要:定時檢測器定時拿出一部分重新的用過濾器進(jìn)行檢測剔除不能用的代理。重載是讓類以統(tǒng)一的方式處理不同類型數(shù)據(jù)的一種手段。雖然在內(nèi)存中存儲表數(shù)據(jù)確實會提供很高的性能,但當(dāng)守護(hù)進(jìn)程崩潰時,所有的數(shù)據(jù)都會丟失。第1題: 如何解決驗證碼的問題,用什么模塊,聽過哪些人工打碼平臺? PIL、pytesser、tesseract模塊 平臺的話有:(打碼平臺特殊,不保證時效性) 云打碼 掙碼 斐斐打碼 若快打碼...
摘要:定時檢測器定時拿出一部分重新的用過濾器進(jìn)行檢測剔除不能用的代理。重載是讓類以統(tǒng)一的方式處理不同類型數(shù)據(jù)的一種手段。雖然在內(nèi)存中存儲表數(shù)據(jù)確實會提供很高的性能,但當(dāng)守護(hù)進(jìn)程崩潰時,所有的數(shù)據(jù)都會丟失。第1題: 如何解決驗證碼的問題,用什么模塊,聽過哪些人工打碼平臺? PIL、pytesser、tesseract模塊 平臺的話有:(打碼平臺特殊,不保證時效性) 云打碼 掙碼 斐斐打碼 若快打碼...
摘要:首發(fā)于我的博客線程池進(jìn)程池網(wǎng)絡(luò)編程之同步異步阻塞非阻塞后端掘金本文為作者原創(chuàng),轉(zhuǎn)載請先與作者聯(lián)系。在了解的數(shù)據(jù)結(jié)構(gòu)時,容器可迭代對象迭代器使用進(jìn)行并發(fā)編程篇二掘金我們今天繼續(xù)深入學(xué)習(xí)。 Python 算法實戰(zhàn)系列之棧 - 后端 - 掘金原文出處: 安生??? 棧(stack)又稱之為堆棧是一個特殊的有序表,其插入和刪除操作都在棧頂進(jìn)行操作,并且按照先進(jìn)后出,后進(jìn)先出的規(guī)則進(jìn)行運(yùn)作。 如...
閱讀 1889·2021-11-22 09:34
閱讀 3176·2019-08-30 15:55
閱讀 749·2019-08-30 15:53
閱讀 2128·2019-08-30 15:52
閱讀 3057·2019-08-29 18:32
閱讀 2075·2019-08-29 17:15
閱讀 2456·2019-08-29 13:14
閱讀 3608·2019-08-28 18:05