亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

python并發(fā) 1:使用 futures 處理并發(fā)

Kyxy / 850人閱讀

摘要:標(biāo)準(zhǔn)庫(kù)中所有阻塞型函數(shù)都會(huì)釋放,允許其他線程運(yùn)行。如果調(diào)用引發(fā)異常,那么當(dāng)從迭代器檢索其值時(shí),將引發(fā)異常??偨Y(jié)自版就支持線程了,只不過(guò)是使用線程的最新方式。類封裝了模塊的組件,使使用線程變得更加方便。下一篇筆記應(yīng)該是使用處理并發(fā)。

作為Python程序員,平時(shí)很少使用并發(fā)編程,偶爾使用也只需要派生出一批獨(dú)立的線程,然后放到隊(duì)列中,批量執(zhí)行。所以,不夸張的說(shuō),雖然我知道線程、進(jìn)程、并行、并發(fā)的概念,但每次使用的時(shí)候可能還需要再打開文檔回顧一下。

現(xiàn)在這一篇還是 《流暢的python》讀書筆記,譯者在這里把future 翻譯為“期物”,我覺(jué)得不太合適,既然future不能找到一個(gè)合適的詞匯,暫時(shí)還是直接使用 future 吧。

concurrent.futures

future 是一種對(duì)象,表示異步執(zhí)行的操作。這個(gè)概念是 concurrent.futures模塊和asyncio包的基礎(chǔ)。

concurrent.futures 模塊是Python3.2 引入的,對(duì)于Python2x 版本,Python2.5 以上的版本可以安裝 futures 包來(lái)使用這個(gè)模塊。

從Python3.4起,標(biāo)準(zhǔn)庫(kù)中有兩個(gè)為Future的類:concurrent.futures.Future 和 asyncio.Future。這兩個(gè)類作用相同:兩個(gè)Future類的實(shí)例都表示可能已經(jīng)完成或未完成的延遲計(jì)算。

Future 封裝待完成的操作,可放入隊(duì)列,完成的狀態(tài)可以查詢,得到結(jié)果(或拋出異常)后可以獲取結(jié)果(或異常)。

我們知道,如果程序中包含I/O操作,程序會(huì)有很高的延遲,CPU會(huì)處于等待狀態(tài),這時(shí)如果我們不使用并發(fā)會(huì)浪費(fèi)很多時(shí)間。

示例

我們先舉個(gè)例子:

下邊是有兩段代碼,主要功能都是從網(wǎng)上下載人口前20的國(guó)際的國(guó)旗:
第一段代碼(flagss.py)是依序下載:下載完一個(gè)圖片后保存到硬盤,然后請(qǐng)求下一張圖片;
第二段代碼(flagss_threadpool.py)使用 concurrent.futures 模塊,批量下載10張圖片。

運(yùn)行分別運(yùn)行兩段代碼3次,結(jié)果如下:

images.py 的結(jié)果如下

$ python flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 6.18s

$ python flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 5.67s

$ python flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 6.55s

可以看到,依次下載10張圖片,平均需要6秒

flags_threadpool.py 的結(jié)果如下:

$ python flags_threadpool.py
NG EG VN BR JP FR DE CN TR BD PK MX PH US RU IN ET CD ID IR 
20 flags downloaded in 2.12s

$ python flags_threadpool.py
BR IN DE FR TR RU EG NG JP CN ID ET PK MX PH US IR CD VN BD 
20 flags downloaded in 2.23s

$ python flags_threadpool.py
CN BR DE ID NG RU TR IN MX US IR BD VN CD PH EG FR JP ET PK 
20 flags downloaded in 1.18s

使用 concurrent.futures 后,下載10張圖片平均需要2秒

通過(guò)上邊的結(jié)果我們發(fā)現(xiàn)使用 concurrent.futures 后,下載效率大幅提升。

下邊我們來(lái)看下這兩段代碼。

同步執(zhí)行的代碼flags.py:

#! -*- coding: utf-8 -*-

import os
import time
import sys

import requests  # <1>

POP20_CC = ("CN IN US ID BR PK NG BD RU JP "
            "MX PH VN ET EG DE IR TR CD FR").split()  # <2>

BASE_URL = "http://flupy.org/data/flags"  # <3>

DEST_DIR = "images/"  # <4>


# 保存圖片
def save_flag(img, filename):  # <5>
    path = os.path.join(DEST_DIR, filename)
    with open(path, "wb") as fp:
        fp.write(img)


# 下載圖片
def get_flag(cc):  # <6>
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    # 這里我們使用 requests 包,需要先通過(guò)pypi安裝
    resp = requests.get(url)
    return resp.content


# 顯示一個(gè)字符串,然后刷新sys.stdout,目的是在一行消息中看到進(jìn)度
def show(text):  # <7>
    print(text, end=" ")
    sys.stdout.flush()


def download_many(cc_list):  # <8>
    for cc in sorted(cc_list):  # <9>
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + ".gif")

    return len(cc_list)


def main(download_many):  # <10>
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = "
{} flags downloaded in {:.2f}s"
    print(msg.format(count, elapsed))


if __name__ == "__main__":
    main(download_many)  # <11>

使用 concurrent.future 并發(fā)的代碼 flags_threadpool.py

#! -*- coding: utf-8 -*-

from concurrent import futures

from flags import save_flag, get_flag, show, main

# 設(shè)定ThreadPoolExecutor 類最多使用幾個(gè)線程
MAX_WORKERS = 20


# 下載一個(gè)圖片
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc


def download_many(cc_list):
    # 設(shè)定工作的線程數(shù)量,使用約需的最大值與要處理的數(shù)量直接較小的那個(gè)值,以免創(chuàng)建多余的線程
    workers = min(MAX_WORKERS, len(cc_list))  # <4>
    # 使用工作的線程數(shù)實(shí)例化ThreadPoolExecutor類;
    # executor.__exit__方法會(huì)調(diào)用executor.shutdown(wait=True)方法,
    # 它會(huì)在所有線程都執(zhí)行完畢前阻塞線程
    with futures.ThreadPoolExecutor(workers) as executor:  # <5>
        # map 與內(nèi)置map方法類似,不過(guò)download_one 函數(shù)會(huì)在多個(gè)線程中并發(fā)調(diào)用;
        # map 方法返回一個(gè)生成器,因此可以迭代,
        # 迭代器的__next__方法調(diào)用各個(gè)Future 的 result 方法
        res = executor.map(download_one, sorted(cc_list))

    # 返回獲取的結(jié)果數(shù)量;如果有現(xiàn)成拋出異常,會(huì)在這里拋出
    # 這與隱式調(diào)用next() 函數(shù)從迭代器中獲取相應(yīng)的返回值一樣。
    return len(list(res))  # <7>
    return len(results)


if __name__ == "__main__":
    main(download_many)

上邊的代碼,我們對(duì) concurrent.futures 的使用有了大致的了解。但 future 在哪里呢,我們并沒(méi)有看到。

Future 是 concurrent.futures 模塊和 asyncio 包的重要組件。從Python3.4起,標(biāo)準(zhǔn)庫(kù)中有兩個(gè)為Future的類:concurrent.futures.Future 和 asyncio.Future。這兩個(gè)Future作用相同。

Future 封裝待完成的操作,可放入隊(duì)列,完成的狀態(tài)可以查詢,得到結(jié)果(或拋出異常)后可以獲取結(jié)果(或異常)。
Future 表示終將發(fā)生的事情,而確定某件事情會(huì)發(fā)生的唯一方式是執(zhí)行的時(shí)間已經(jīng)排定。因此只有把某件事交給 concurrent.futures.Executor 子類處理時(shí),才會(huì)創(chuàng)建 concurrent.futures.Future 實(shí)例。

例如,調(diào)用Executor.submit() 方法的參數(shù)是一個(gè)可調(diào)用的對(duì)象,調(diào)用這個(gè)方法后會(huì)為傳入的可調(diào)用對(duì)象排期,并返回一個(gè)Future。

Future 有三個(gè)重要的方法:

.done() 返回布爾值,表示Future 是否已經(jīng)執(zhí)行

.add_done_callback() 這個(gè)方法只有一個(gè)參數(shù),類型是可調(diào)用對(duì)象,F(xiàn)uture運(yùn)行結(jié)束后會(huì)回調(diào)這個(gè)對(duì)象。

.result() 如果 Future 運(yùn)行結(jié)束后調(diào)用result(), 會(huì)返回可調(diào)用對(duì)象的結(jié)果或者拋出執(zhí)行可調(diào)用對(duì)象時(shí)拋出的異常,如果是 Future 沒(méi)有運(yùn)行結(jié)束時(shí)調(diào)用 f.result()方法,這時(shí)會(huì)阻塞調(diào)用方所在的線程,直到有結(jié)果返回。此時(shí)result 方法還可以接收 timeout 參數(shù),如果在指定的時(shí)間內(nèi) Future 沒(méi)有運(yùn)行完畢,會(huì)拋出 TimeoutError 異常。

asyncio.Future.result 方法不支持設(shè)定超時(shí)時(shí)間,如果想獲取 Future 的結(jié)果,可以使用 yield from 結(jié)構(gòu)

為了加深對(duì) Future 的理解,現(xiàn)在我們修改下 flags_threadpool.py download_many 函數(shù)。

def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        # 用于創(chuàng)建并排定 future
        for cc in sorted(cc_list):
            # submit 方法排定可調(diào)用對(duì)象的執(zhí)行時(shí)間然后返回一個(gè)future,表示這個(gè)待執(zhí)行的操作
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = "Scheduled for {}: {}"
            print(msg.format(cc, future))
        
        results = []
        # 用于獲取future 結(jié)果
        # as_completed 接收一個(gè)future 列表,返回值是一個(gè)迭代器,在運(yùn)行結(jié)束后產(chǎn)出future
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = "{} result: {!r}"
            print(msg.format(future, res))
            results.append(res)
    
    return len(results)

現(xiàn)在執(zhí)行代碼,運(yùn)行結(jié)果如下:

Scheduled for BR: 
Scheduled for CN: 
Scheduled for ID: 
Scheduled for IN: 
Scheduled for US: 
BR  result: "BR"
IN  result: "IN"
CN  result: "CN"
ID  result: "ID"
US  result: "US"

5 flags downloaded in 1.47s

從結(jié)果可以看到,future 的 repr() 方法會(huì)顯示狀態(tài),前三個(gè) 是running 是因?yàn)槲覀冊(cè)O(shè)定了三個(gè)進(jìn)程,所以后兩個(gè)是pendding 狀態(tài)。如果將max_workers參數(shù)設(shè)置為5,結(jié)果就會(huì)全都是 running。

雖然,使用 future 的腳步比第一個(gè)腳本的執(zhí)行速度快了很多,但由于受GIL的限制,下載并不是并行的。

GIL(Global Interpreter Lock)和阻塞型I/O

CPython 解釋器本身不是線程安全的,因此解釋器被一個(gè)全局解釋器鎖保護(hù)著,它確保任何時(shí)候都只有一個(gè)Python線程執(zhí)行。

然而,Python標(biāo)準(zhǔn)庫(kù)中所有執(zhí)行阻塞型I/O操作的函數(shù),在等待系統(tǒng)返回結(jié)果時(shí)都會(huì)釋放GIL。這意味著I/O密集型Python程序能從中受益:一個(gè)Python線程等待網(wǎng)絡(luò)響應(yīng)時(shí),阻塞型I/O函數(shù)會(huì)釋放GIL,再運(yùn)行一個(gè)線程。

Python 標(biāo)準(zhǔn)庫(kù)中所有阻塞型I/O函數(shù)都會(huì)釋放GIL,允許其他線程運(yùn)行。time.sleep()函數(shù)也會(huì)釋放GIL。

那么如何在CPU密集型作業(yè)中使用 concurrent.futures 模塊繞開GIL呢?

答案是 使用 ProcessPoolExecutor 類。

使用這個(gè)模塊可以在做CPU密集型工作是繞開GIL,利用所有可用核心。

ThreadPoolExecutor 和 ProcessPoolExecutor 都實(shí)現(xiàn)了通用的 Executor 接口,所以,我們可以輕松的將基于線程的方案改為使用進(jìn)程的方案。

比如下邊這樣:

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        pass

# 改成
def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:
        pass

需要注意的是,ThreadPoolExecutor 需要指定 max_workers 參數(shù),
而 ProcessPoolExecutor 的這個(gè)參數(shù)是可選的默認(rèn)值是 os.cup_count()(計(jì)算機(jī)cpu核心數(shù))。

ProcessPoolExecutor 的價(jià)值主要體現(xiàn)在CPU密集型作業(yè)上。

使用Python處理CPU密集型工作,應(yīng)該試試PyPy,會(huì)有更高的執(zhí)行速度。

現(xiàn)在我們回到開始的代碼,看下 Executor.map 函數(shù)。

文檔中對(duì)map函數(shù)的介紹如下。

map(func, *iterables, timeout=None, chunksize=1)

等同于 map(func, *iterables),不同的是 func 是異步執(zhí)行的,并且可以同時(shí)進(jìn)行對(duì) func 的多個(gè)調(diào)用。如果調(diào)用 __next__(),則返回的迭代器提出 concurrent.futures.TimeoutError,并且在從 Executor.map() 的原始調(diào)用起的 timeout 秒之后結(jié)果不可用。 timeout 可以是int或float。如果未指定 timeout 或 None,則等待時(shí)間沒(méi)有限制。如果調(diào)用引發(fā)異常,那么當(dāng)從迭代器檢索其值時(shí),將引發(fā)異常。當(dāng)使用 ProcessPoolExecutor 時(shí),此方法將 iterables 分成多個(gè)塊,它作為多帶帶的任務(wù)提交到進(jìn)程池。這些塊的(近似)大小可以通過(guò)將 chunksize 設(shè)置為正整數(shù)來(lái)指定。對(duì)于非常長(zhǎng)的迭代,與默認(rèn)大小1相比,使用大值 chunksize 可以顯著提高性能。使用 ThreadPoolExecutor,chunksize 沒(méi)有效果。

在 3.5 版更改: 添加了 chunksize 參數(shù)。

Executor.map 還有個(gè)特性比較有用,那就是這個(gè)函數(shù)返回結(jié)果的順序于調(diào)用開始的順序是一致的。如果第一個(gè)調(diào)用稱其結(jié)果用時(shí)10秒,其他調(diào)用只用1秒,代碼會(huì)阻塞10秒,獲取map方法返回的生成器產(chǎn)出的第一個(gè)結(jié)果。

如果不是獲取到所有結(jié)果再處理,通常會(huì)使用 Executor.submit + Executor.as_completed 組合使用的方案。

Executor.submit + Executor.as_completed 這個(gè)組合更靈活,因?yàn)閟ubmit方法能處理不同的可調(diào)用對(duì)象和參數(shù),而executor.map 只能處理參數(shù)不同的同一個(gè)可調(diào)用對(duì)象。此外,傳給futures.as_completed 函數(shù)的期物集合可以來(lái)自不同的 Executor 實(shí)例。

future 的異常處理

futures 有三個(gè)異常類:

exception concurrent.futures.CancelledError 在future取消時(shí)引發(fā)。

exception concurrent.futures.TimeoutError 在future操作超過(guò)給定超時(shí)時(shí)觸發(fā)。

exception concurrent.futures.process.BrokenProcessPool
從 RuntimeError 派生,當(dāng) ProcessPoolExecutor 的一個(gè)工人以非干凈方式終止(例如,如果它從外部被殺死)時(shí),引發(fā)此異常類。

我們先看一下,future.result() 出現(xiàn)異常的處理情況。代碼改動(dòng)如下:

# 將第一個(gè) CN 改為CN1 也可以是其它任意錯(cuò)誤代碼
POP20_CC = ("CN1 IN US ID BR PK NG BD RU JP "
            "MX PH VN ET EG DE IR TR CD FR").split()


def get_flag(cc):  # <6>
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:  # <1>
        resp.raise_for_status() # 如果不是200 拋出異常
    return resp.content

def download_one(cc):
    try:
        image = get_flag(cc)
    # 捕獲 requests.exceptions.HTTPError
    except requests.exceptions.HTTPError as exc:  #
        # 如果有異常 直接拋出
        raise
    else:
        save_flag(image, cc.lower() + ".gif")
    return cc

現(xiàn)在執(zhí)行代碼,會(huì)發(fā)現(xiàn) download_one 中的異常傳遞到了download_many 中,并且導(dǎo)致拋出了異常,未執(zhí)行完的其它future 也都中斷。

為了能保證其它沒(méi)有錯(cuò)誤的future 可以正常執(zhí)行,這里我們需要對(duì)future.result() 做異常處理。

改動(dòng)結(jié)果如下:

def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=20) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do_map[future] = cc
            msg = "Scheduled for {}: {}"
            print(msg.format(cc, future))

        results = []
        for future in futures.as_completed(to_do_map):
            try:
                res = future.result()
            except requests.exceptions.HTTPError as exc:
                # 處理可能出現(xiàn)的異常
                error_msg = "{} result {}".format(cc, exc)
            else:
                error_msg = ""
            if error_msg:
                cc = to_do_map[future]  # <16>
                print("*** Error for {}: {}".format(cc, error_msg))
            else:
                msg = "{} result: {!r}"
                print(msg.format(future, res))
                results.append(res)

    return len(results)

這里我們用到了一個(gè)對(duì) futures.as_completed 函數(shù)特別有用的慣用法:構(gòu)建一個(gè)字典,把各個(gè)future映射到其他數(shù)據(jù)(future運(yùn)行結(jié)束后可能用的)上。這樣,雖然 future生成的順序雖然已經(jīng)亂了,依然便于使用結(jié)果做后續(xù)處理。

一篇寫完了沒(méi)有總結(jié)總感覺(jué)少點(diǎn)什么,所以。

總結(jié)

Python 自 0.9.8 版就支持線程了,concurrent.futures 只不過(guò)是使用線程的最新方式。

futures.ThreadPoolExecutor 類封裝了 threading 模塊的組件,使使用線程變得更加方便。

順便再推薦一下 《流暢的python》,絕對(duì)值得一下。

下一篇筆記應(yīng)該是使用 asyncio 處理并發(fā)。

最后,感謝女朋友支持。

>歡迎關(guān)注 >請(qǐng)我喝芬達(dá)

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/38632.html

相關(guān)文章

  • Python基礎(chǔ)之使用期物處理并發(fā)

    摘要:本文重點(diǎn)掌握異步編程的相關(guān)概念了解期物的概念意義和使用方法了解中的阻塞型函數(shù)釋放的特點(diǎn)。一異步編程相關(guān)概念阻塞程序未得到所需計(jì)算資源時(shí)被掛起的狀態(tài)。 導(dǎo)語(yǔ):本文章記錄了本人在學(xué)習(xí)Python基礎(chǔ)之控制流程篇的重點(diǎn)知識(shí)及個(gè)人心得,打算入門Python的朋友們可以來(lái)一起學(xué)習(xí)并交流。 本文重點(diǎn): 1、掌握異步編程的相關(guān)概念;2、了解期物future的概念、意義和使用方法;3、了解Python...

    asoren 評(píng)論0 收藏0
  • Python中編寫并發(fā)程序

    摘要:在中由于歷史原因使得中多線程的效果非常不理想使得任何時(shí)刻只能利用一個(gè)核并且它的調(diào)度算法簡(jiǎn)單粗暴多線程中讓每個(gè)線程運(yùn)行一段時(shí)間然后強(qiáng)行掛起該線程繼而去運(yùn)行其他線程如此周而復(fù)始直到所有線程結(jié)束這使得無(wú)法有效利用計(jì)算機(jī)系統(tǒng)中的局部性頻繁的線程切換 GIL 在Python中,由于歷史原因(GIL),使得Python中多線程的效果非常不理想.GIL使得任何時(shí)刻Python只能利用一個(gè)CPU核,...

    _ipo 評(píng)論0 收藏0
  • python并發(fā)2:使用asyncio處理并發(fā)

    摘要:是之后引入的標(biāo)準(zhǔn)庫(kù)的,這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。沒(méi)有能從外部終止線程,因?yàn)榫€程隨時(shí)可能被中斷。上一篇并發(fā)使用處理并發(fā)我們介紹過(guò)的,在中,只是調(diào)度執(zhí)行某物的結(jié)果。 asyncio asyncio 是Python3.4 之后引入的標(biāo)準(zhǔn)庫(kù)的,這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。asyncio 包在引入標(biāo)準(zhǔn)庫(kù)之前代號(hào) Tulip(郁金香),所以在網(wǎng)上搜索資料時(shí),會(huì)經(jīng)??吹竭@種花的...

    wushuiyong 評(píng)論0 收藏0
  • Python 學(xué)習(xí)筆記 并發(fā) future

    摘要:和類是高級(jí)類,大部分情況下只要學(xué)會(huì)使用即可,無(wú)需關(guān)注其實(shí)現(xiàn)細(xì)節(jié)。類與類十分相似,只不過(guò)一個(gè)是處理進(jìn)程,一個(gè)是處理線程,可根據(jù)實(shí)際需要選擇。示例運(yùn)行結(jié)果不同機(jī)器運(yùn)行結(jié)果可能不同。 concurrent.futures模塊 該模塊主要特色在于ThreadPoolExecutor 和 ProcessPoolExecutor 類,這兩個(gè)類都繼承自concurrent.futures._base...

    lewif 評(píng)論0 收藏0
  • python基礎(chǔ)教程:異步IO 之 API

    摘要:具有以下基本同步原語(yǔ)子進(jìn)程提供了通過(guò)創(chuàng)建和管理子進(jìn)程的。雖然隊(duì)列不是線程安全的,但它們被設(shè)計(jì)為專門用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語(yǔ)法編寫并發(fā)代碼的標(biāo)準(zhǔn)庫(kù)。通過(guò)上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個(gè)版本,asyncio又做了比較大的調(diào)整,把這個(gè)庫(kù)的API分為了 高層級(jí)API和...

    vboy1010 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<