亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

python并發(fā)2:使用asyncio處理并發(fā)

wushuiyong / 2848人閱讀

摘要:是之后引入的標準庫的,這個包使用事件循環(huán)驅(qū)動的協(xié)程實現(xiàn)并發(fā)。沒有能從外部終止線程,因為線程隨時可能被中斷。上一篇并發(fā)使用處理并發(fā)我們介紹過的,在中,只是調(diào)度執(zhí)行某物的結(jié)果。

asyncio

asyncio 是Python3.4 之后引入的標準庫的,這個包使用事件循環(huán)驅(qū)動的協(xié)程實現(xiàn)并發(fā)。
asyncio 包在引入標準庫之前代號 “Tulip”(郁金香),所以在網(wǎng)上搜索資料時,會經(jīng)常看到這種花的名字。

什么是事件循環(huán)?

wiki 上說:事件循環(huán)是”一種等待程序分配事件或者消息的編程架構(gòu)“?;旧蟻碚f事件循環(huán)就是:”當A發(fā)生時,執(zhí)行B"?;蛘哂米詈唵蔚睦觼斫忉屵@一概念就是每個瀏覽器中都存在的JavaScript事件循環(huán)。當你點擊了某個東西(“當A發(fā)生時”),這一點擊動作會發(fā)送給JavaScript的事件循環(huán),并檢查是否存在注冊過的onclick 回調(diào)來處理這一點擊(執(zhí)行B)。只要有注冊過的回調(diào)函數(shù)就會伴隨點擊動作的細節(jié)信息被執(zhí)行。事件循環(huán)被認為是一種虛幻是因為它不停的手機事件并通過循環(huán)來發(fā)如何應(yīng)對這些事件。

對 Python 來說,用來提供事件循環(huán)的 asyncio 被加入標準庫中。asyncio 重點解決網(wǎng)絡(luò)服務(wù)中的問題,事件循環(huán)在這里將來自套接字(socket)的 I/O 已經(jīng)準備好讀和/或?qū)懽鳛椤爱擜發(fā)生時”(通過selectors模塊)。除了 GUI 和 I/O,事件循環(huán)也經(jīng)常用于在別的線程或子進程中執(zhí)行代碼,并將事件循環(huán)作為調(diào)節(jié)機制(例如,合作式多任務(wù))。如果你恰好理解 Python 的 GIL,事件循環(huán)對于需要釋放 GIL 的地方很有用。

線程與協(xié)程

我們先看兩斷代碼,分別用 threading 模塊和asyncio 包實現(xiàn)的一段代碼。

# sinner_thread.py

import threading
import itertools
import time
import sys


class Signal: # 這個類定義一個可變對象,用于從外部控制線程
    go = True


def spin(msg, signal):  # 這個函數(shù)會在多帶帶的線程中運行,signal 參數(shù)是前邊定義的Signal類的實例
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle("|/-"):  # itertools.cycle 函數(shù)從指定的序列中反復(fù)不斷地生成元素
        status = char + " " + msg
        write(status)
        flush()
        write("x08" * len(status))  # 使用退格符把光標移回行首
        time.sleep(.1)  # 每 0.1 秒刷新一次
        if not signal.go:  # 如果 go屬性不是 True,退出循環(huán)
            break

    write(" " * len(status) + "x08" * len(status))  # 使用空格清除狀態(tài)消息,把光標移回開頭


def slow_function():  # 模擬耗時操作
    # 假裝等待I/O一段時間
    time.sleep(3)  # 調(diào)用sleep 會阻塞主線程,這么做事為了釋放GIL,創(chuàng)建從屬線程
    return 42


def supervisor():  # 這個函數(shù)設(shè)置從屬線程,顯示線程對象,運行耗時計算,最后殺死進程
    signal = Signal()
    spinner = threading.Thread(target=spin,
                               args=("thinking!", signal))
    print("spinner object:", spinner)  # 顯示線程對象 輸出 spinner object: 
    spinner.start()  # 啟動從屬進程
    result = slow_function()  # 運行slow_function 行數(shù),阻塞主線程。同時叢書線程以動畫形式旋轉(zhuǎn)指針
    signal.go = False
    spinner.join()  # 等待spinner 線程結(jié)束
    return result

def main():
    result = supervisor()  
    print("Answer", result)


if __name__ == "__main__":
    main()

執(zhí)行一下,結(jié)果大致是這個樣子:

這是一個動圖,“thinking" 前的 線是會動的(為了錄屏,我把sleep 的時間調(diào)大了)

python 并沒有提供終止線程的API,所以若想關(guān)閉線程,必須給線程發(fā)送消息。這里我們使用signal.go 屬性:在主線程中把它設(shè)置為False后,spinner 線程會接收到,然后退出

現(xiàn)在我們再看下使用 asyncio 包的版本:

# spinner_asyncio.py
# 通過協(xié)程以動畫的形式顯示文本式旋轉(zhuǎn)指針

import asyncio
import itertools
import sys


@asyncio.coroutine # 打算交給asyncio 處理的協(xié)程要使用 @asyncio.coroutine 裝飾
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle("|/-"):  # itertools.cycle 函數(shù)從指定的序列中反復(fù)不斷地生成元素
        status = char + " " + msg
        write(status)
        flush()
        write("x08" * len(status))  # 使用退格符把光標移回行首
        try:
            yield from asyncio.sleep(0.1)  # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 這樣的休眠不會阻塞事件循環(huán)
        except asyncio.CancelledError:  # 如果 spin 函數(shù)蘇醒后拋出 asyncio.CancelledError 異常,其原因是發(fā)出了取消請求
            break

    write(" " * len(status) + "x08" * len(status))  # 使用空格清除狀態(tài)消息,把光標移回開頭


@asyncio.coroutine
def slow_function():  # 5 現(xiàn)在此函數(shù)是協(xié)程,使用休眠假裝進行I/O 操作時,使用 yield from 繼續(xù)執(zhí)行事件循環(huán)
    # 假裝等待I/O一段時間
    yield from asyncio.sleep(3)  # 此表達式把控制權(quán)交給主循環(huán),在休眠結(jié)束后回復(fù)這個協(xié)程
    return 42


@asyncio.coroutine
def supervisor():  #這個函數(shù)也是協(xié)程,因此可以使用 yield from 驅(qū)動 slow_function
    spinner = asyncio.async(spin("thinking!"))  # asyncio.async() 函數(shù)排定協(xié)程的運行時間,使用一個 Task 對象包裝spin 協(xié)程,并立即返回
    print("spinner object:", spinner)  # Task 對象,輸出類似 spinner object: >
    # 驅(qū)動slow_function() 函數(shù),結(jié)束后,獲取返回值。同事事件循環(huán)繼續(xù)運行,
    # 因為slow_function 函數(shù)最后使用yield from asyncio.sleep(3) 表達式把控制權(quán)交給主循環(huán)
    result = yield from slow_function()
    # Task 對象可以取消;取消后會在協(xié)程當前暫停的yield處拋出 asyncio.CancelledError 異常
    # 協(xié)程可以捕獲這個異常,也可以延遲取消,甚至拒絕取消
    spinner.cancel()

    return result

def main():
    loop = asyncio.get_event_loop()  # 獲取事件循環(huán)引用
    # 驅(qū)動supervisor 協(xié)程,讓它運行完畢;這個協(xié)程的返回值是這次調(diào)用的返回值
    result = loop.run_until_complete(supervisor())
    loop.close()
    print("Answer", result)


if __name__ == "__main__":
    main()

除非想阻塞主線程,從而凍結(jié)事件循環(huán)或整個應(yīng)用,否則不要再 asyncio 協(xié)程中使用 time.sleep().
如果協(xié)程需要在一段時間內(nèi)什么都不做,應(yīng)該使用 yield from asyncio.sleep(DELAY)

使用 @asyncio.coroutine 裝飾器不是強制要求,但建議這么做因為這樣能在代碼中突顯協(xié)程,如果還沒從中產(chǎn)出值,協(xié)程就把垃圾回收了(意味著操作未完成,可能有缺陷),可以發(fā)出警告。這個裝飾器不會預(yù)激協(xié)程。

這兩段代碼的執(zhí)行結(jié)果基本相同,現(xiàn)在我們看一下兩段代碼的核心代碼 supervisor 主要區(qū)別:

asyncio.Task 對象差不多與 threading.Thread 對象等效(Task 對象像是實現(xiàn)寫作時多任務(wù)的庫中的綠色線程

Task 對象用于驅(qū)動協(xié)程,Thread 對象用于調(diào)用可調(diào)用的對象

Task 對象不由自己動手實例化,而是通過把協(xié)程傳給 asyncio.async(...) 函數(shù)或 loop.create_task(...) 方法獲取

獲取的Task 對象已經(jīng)排定了運行時間;Thread 實例必須調(diào)用start方法,明確告知它運行

在線程版supervisor函數(shù)中,slow_function 是普通的函數(shù),由線程直接調(diào)用,而異步版的slow_function 函數(shù)是協(xié)程,由yield from 驅(qū)動。

沒有API能從外部終止線程,因為線程隨時可能被中斷。而如果想終止任務(wù),可以使用Task.cancel() 實例方法,在協(xié)程內(nèi)部拋出CancelledError 異常。協(xié)程可以在暫停的yield 處捕獲這個異常,處理終止請求

supervisor 協(xié)程必須在main 函數(shù)中由loop.run_until_complete 方法執(zhí)行。

協(xié)程和線程相比關(guān)鍵的一個優(yōu)點是,
線程必須記住保留鎖,去保護程序中的重要部分,防止多步操作再執(zhí)行的過程中中斷,防止山水處于于曉狀態(tài)
協(xié)程默認會做好保護,我們必須顯式產(chǎn)出(使用yield 或 yield from 交出控制權(quán))才能讓程序的余下部分運行。

asyncio.Future:故意不阻塞

asynci.Future 類與 concurrent.futures.Future 類的接口基本一致,不過實現(xiàn)方式不同,不可互換。

上一篇[python并發(fā) 1:使用 futures 處理并發(fā)]()我們介紹過 concurrent.futures.Future 的 future,在 concurrent.futures.Future 中,future只是調(diào)度執(zhí)行某物的結(jié)果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一個協(xié)程,排定它的運行時間,然后返回一個asyncio.Task 實例(也是asyncio.Future 類的實例,因為 Task 是 Future 的子類,用于包裝協(xié)程。(在 concurrent.futures.Future 中,類似的操作是Executor.submit(...))。

與concurrent.futures.Future 類似,asyncio.Future 類也提供了

.done() 返回布爾值,表示Future 是否已經(jīng)執(zhí)行

.add_done_callback() 這個方法只有一個參數(shù),類型是可調(diào)用對象,F(xiàn)uture運行結(jié)束后會回調(diào)這個對象。

.result() 這個方法沒有參數(shù),因此不能指定超時時間。 如果調(diào)用 .result() 方法時期還沒有運行完畢,會拋出 asyncio.InvalidStateError 異常。

對應(yīng)的 concurrent.futures.Future 類中的 Future 運行結(jié)束后調(diào)用result(), 會返回可調(diào)用對象的結(jié)果或者拋出執(zhí)行可調(diào)用對象時拋出的異常,如果是 Future 沒有運行結(jié)束時調(diào)用 f.result()方法,這時會阻塞調(diào)用方所在的線程,直到有結(jié)果返回。此時result 方法還可以接收 timeout 參數(shù),如果在指定的時間內(nèi) Future 沒有運行完畢,會拋出 TimeoutError 異常。

我們使用asyncio.Future 時, 通常使用yield from,從中獲取結(jié)果,而不是使用 result()方法 yield from 表達式在暫停的協(xié)程中生成返回值,回復(fù)執(zhí)行過程。

asyncio.Future 類的目的是與 yield from 一起使用,所以通常不需要使用以下方法:

不需調(diào)用 my_future.add_down_callback(...), 因為可以直接把想在 future 運行結(jié)束后的操作放在協(xié)程中 yield from my_future 表達式的后邊。(因為協(xié)程可以暫停和恢復(fù)函數(shù))

無需調(diào)用 my_future.result(), 因為 yield from 產(chǎn)生的結(jié)果就是(result = yield from my_future)

在 asyncio 包中,可以使用yield from 從asyncio.Future 對象中產(chǎn)出結(jié)果。這也就意味著我們可以這么寫:

res = yield from foo()  # foo 可以是協(xié)程函數(shù),也可以是返回 Future 或 task 實例的普通函數(shù)
asyncio.async(...)* 函數(shù)
asyncio.async(coro_or_future, *, loop=None)

這個函數(shù)統(tǒng)一了協(xié)程和Future: 第一個參數(shù)可以是二者中的任意一個。如果是Future 或者 Task 對象,就直接返回,如果是協(xié)程,那么async 函數(shù)會自動調(diào)用 loop.create_task(...) 方法創(chuàng)建 Task 對象。 loop 參數(shù)是可選的,用于傳入事件循環(huán); 如果沒有傳入,那么async函數(shù)會通過調(diào)用asyncio.get_event_loop() 函數(shù)獲取循環(huán)對象。

BaseEventLoop.create_task(coro)

這個方法排定協(xié)程的執(zhí)行時間,返回一個 asyncio.Task 對象。如果在自定義的BaseEventLoop 子類上調(diào)用,返回的對象可能是外部庫中與Task類兼容的某個類的實例。

BaseEventLoop.create_task() 方法只在Python3.4.2 及以上版本可用。 Python3.3 只能使用 asyncio.async(...)函數(shù)。

如果想在Python控制臺或者小型測試腳本中實驗future和協(xié)程,可以使用下面的片段:

import asyncio

def run_sync(coro_or_future):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coro_or_future)

a = run_sync(some_coroutine())
使用asyncio 和 aiohttp 包下載

現(xiàn)在,我們了解了asyncio 的基礎(chǔ)知識,是時候使用asyncio 來重寫我們 上一篇 [python并發(fā) 1:使用 futures 處理并發(fā)]() 下載國旗的腳本了。

先看一下代碼:

import asyncio

import aiohttp  # 需要pip install aiohttp

from flags import save_flag, show, main, BASE_URL


@asyncio.coroutine  # 我們知道,協(xié)程應(yīng)該使用 asyncio.coroutine 裝飾
def get_flag(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
     # 阻塞的操作通過協(xié)程實現(xiàn),客戶代碼通過yield from 把指責(zé)委托給協(xié)程,以便異步操作
    resp = yield from aiohttp.request("GET", url) 
    # 讀取也是異步操作
    image = yield from resp.read()
    return image


@asyncio.coroutine
def download_one(cc):  # 這個函數(shù)也必須是協(xié)程,因為用到了yield from
    image = yield from get_flag(cc) 
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc


def download_many(cc_list):
    loop = asyncio.get_event_loop()  # 獲取事件序號底層實現(xiàn)的引用
    to_do = [download_one(cc) for cc in sorted(cc_list)] # 調(diào)用download_one 獲取各個國旗,構(gòu)建一個生成器對象列表
    # 雖然函數(shù)名稱是wait 但它不是阻塞型函數(shù),wait 是一個協(xié)程,等傳給他的所有協(xié)程運行完畢后結(jié)束
    wait_coro = asyncio.wait(to_do)
    res, _ = loop.run_until_complete(wait_coro) # 執(zhí)行事件循環(huán),知道wait_coro 運行結(jié)束;事件循環(huán)運行的過程中,這個腳本會在這里阻塞。
    loop.close() # 關(guān)閉事件循環(huán)
    return len(res)

if __name__ == "__main__":
    main(download_many)

這段代碼的運行簡述如下:

在download_many 函數(shù)獲取一個事件循環(huán),處理調(diào)用download_one 函數(shù)生成的幾個協(xié)程對象

asyncio 事件循環(huán)一次激活各個協(xié)程

客戶代碼中的協(xié)程(get_flag)使用 yield from 把指責(zé)委托給庫里的協(xié)程(aiohttp.request)時,控制權(quán)交還給事件循環(huán),執(zhí)行之前排定的協(xié)程

事件循環(huán)通過基于回調(diào)的底層API,在阻塞的操作執(zhí)行完畢后獲得通知。

獲得通知后,主循環(huán)把結(jié)果發(fā)給暫停的協(xié)程

協(xié)程向前執(zhí)行到下一個yield from 表達式,例如 get_flag 函數(shù)的yield from resp.read()。事件循環(huán)再次得到控制權(quán),重復(fù)第4~6步,直到循環(huán)終止。

download_many 函數(shù)中,我們使用了 asyncio.wait(...) 函數(shù),這個函數(shù)是一個協(xié)程,協(xié)程的參數(shù)是一個由future或者協(xié)程構(gòu)成的可迭代對象;wait 會分別把各個協(xié)程包裝進一個Task對象。最終的結(jié)果是,wait 處理的所有對象都通過某種方式變成Future 類的實例。

wait 是協(xié)程函數(shù),因此,返回的是一個協(xié)程或者生成器對象;waite_coro 變量中存儲的就是這種對象

loop.run_until_complete 方法的參數(shù)是一個future 或協(xié)程。如果是協(xié)程,run_until_complete 方法與 wait 函數(shù)一樣,把協(xié)程包裝進一個Task 對象中。這里 run_until_complete 方法把 wait_coro 包裝進一個Task 對象中,由yield from 驅(qū)動。wait_coro 運行結(jié)束后返回兩個參數(shù),第一個參數(shù)是結(jié)束的future 第二個參數(shù)是未結(jié)束的future。

wait
有兩個命名參數(shù),timeout 和 return_when 如果設(shè)置了可能會返回未結(jié)束的future。

有一點你可能也注意到了,我們重寫了get_flags 函數(shù),是因為之前用到的 requests 庫執(zhí)行的是阻塞型I/O操作。為了使用 asyncio 包,我們必須把函數(shù)改成異步版。

小技巧

如果你覺得 使用了協(xié)程后代碼難以理解,可以采用 Python之父(Guido van Rossum)的建議,假裝沒有yield from。

已上邊這段代碼為例:

@asyncio.coroutine
def get_flag(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request("GET", url) 
    image = yield from resp.read()
    return image

# 把yield form 去掉

def get_flag(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = aiohttp.request("GET", url) 
    image = resp.read()
    return image

# 現(xiàn)在是不是清晰多了
知識點

在asyncio 包的API中使用 yield from 時,有個細節(jié)要注意:

使用asyncio包時,我們編寫的異步代碼中包含由asyncio本身驅(qū)動的協(xié)程(委派生成器),而生成器最終把指責(zé)委托給asyncio包或者第三方庫中的協(xié)程。這種處理方式相當于架起了管道,讓asyncio事件循環(huán)驅(qū)動執(zhí)行底層異步I/O的庫函數(shù)。

避免阻塞型調(diào)用

我們先看一個圖,這個圖顯示了電腦從不同存儲介質(zhì)中讀取數(shù)據(jù)的延遲情況:

通過這個圖,我們可以看到,阻塞型調(diào)用對于CPU來說是巨大的浪費。有什么辦法可以避免阻塞型調(diào)用中止整個應(yīng)用程序么?

有兩種方法:

在多帶帶的線程中運行各個阻塞型操作

把每個阻塞型操作轉(zhuǎn)化成非阻塞的異步調(diào)用使用

當然我們推薦第二種方案,因為第一種方案中如果每個連接都使用一個線程,成本太高。
第二種我們可以使用把生成器當做協(xié)程使用的方式實現(xiàn)異步編程。對事件循環(huán)來說,調(diào)用回調(diào)與在暫停的協(xié)程上調(diào)用 .send() 方法效果差不多。各個暫停的協(xié)程消耗的內(nèi)存比線程小的多。

現(xiàn)在,你應(yīng)該能理解為什么 flags_asyncio.py 腳本比 flags.py 快的多了吧。

因為flags.py 是依次同步下載,每次下載都要用幾十億個CPU周期等待結(jié)果。而在flags_asyncio.py中,在download_many 函數(shù)中調(diào)用loop.run_until_complete 方法時,事件循環(huán)驅(qū)動各個download_one 協(xié)程,運行到y(tǒng)ield from 表達式出,那個表達式又驅(qū)動各個 get_flag 協(xié)程,運行到第一個yield from 表達式處,調(diào)用 aiohttp.request()函數(shù)。這些調(diào)用不會阻塞,因此在零點幾秒內(nèi)所有請求都可以全部開始。

改進 asyncio 下載腳本

現(xiàn)在我們改進一下上邊的 flags_asyncio.py,在其中添加上異常處理,計數(shù)器

import asyncio
import collections
from collections import namedtuple
from enum import Enum

import aiohttp
from aiohttp import web

from flags import save_flag, show, main, BASE_URL

DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

Result = namedtuple("Result", "status data")
HTTPStatus = Enum("Status", "ok not_found error")

# 自定義異常用于包裝其他HTTP貨網(wǎng)絡(luò)異常,并獲取country_code,以便報告錯誤
class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code


@asyncio.coroutine
def get_flag(cc):
    # 此協(xié)程有三種返回結(jié)果:
    # 1.  返回下載到的圖片
    # 2. HTTP 響應(yīng)為404 時,拋出web.HTTPNotFound 異常
    # 3. 返回其他HTTP狀態(tài)碼時, 拋出aiohttp.HttpProcessingError
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request("GET", url)
    if resp.status == 200:
        image = yield from resp.read()
        return image
    elif resp.status == 404:
        raise web.HttpNotFound()
    else:
        raise aiohttp.HttpProcessionError(
            code=resp.status, message=resp.reason,
            headers=resp.headers
        )


@asyncio.coroutine
def download_one(cc, semaphore):
    # semaphore 參數(shù)是 asyncio.Semaphore 類的實例
    # Semaphore 類是同步裝置,用于限制并發(fā)請求
    try:
        with (yield from semaphore):
             # 在yield    from  表達式中把semaphore   當成上下文管理器使用,防止阻塞整個系統(tǒng)
             # 如果semaphore 計數(shù)器的值是所允許的最大值,只有這個協(xié)程會阻塞
              image = yield from get_flag(cc)
              # 退出with語句后 semaphore 計數(shù)器的值會遞減,
              # 解除阻塞可能在等待同一個semaphore對象的其他協(xié)程實例
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = "not found"
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        save_flag(image, cc.lower() + ".gif")
        status = HTTPStatus.ok
        msg = "ok"
    return Result(status, cc)

@asyncio.coroutine
def downloader_coro(cc_list):
    counter = collections.Counter()
    # 創(chuàng)建一個 asyncio.Semaphore 實例,最多允許激活MAX_CONCUR_REQ個使用這個計數(shù)器的協(xié)程
    semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
    # 多次調(diào)用 download_one 協(xié)程,創(chuàng)建一個協(xié)程對象列表
    to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
    # 獲取一個迭代器,這個迭代器會在future運行結(jié)束后返回future
    to_do_iter = asyncio.as_completed(to_do)
    for future in to_do_iter:
        # 迭代允許結(jié)束的 future    
        try:
            res = yield from future # 獲取asyncio.Future 對象的結(jié)果(也可以調(diào)用future.result)
        except FetchError as exc:
            # 拋出的異常都包裝在FetchError  對象里
            country_code = exc.country_code
            try:
                # 嘗試從原來的異常 (__cause__)中獲取錯誤消息
                error_msg = exc.__cause__.args[0]
            except IndexError:
                # 如果在原來的異常中找不到錯誤消息,使用所連接異常的類名作為錯誤消息
                error_msg = exc.__cause__.__class__.__name__
            if error_msg:
                msg = "*** Error for {}: {}"
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status
        counter[status] += 1
    return counter

def download_many(cc_list):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list)
    counts = loop.run_until_complete(coro)
    loop.close()
    return counts


if __name__ == "__main__":
    main(download_many)

由于協(xié)程發(fā)起的請求速度較快,為了防止向服務(wù)器發(fā)起太多的并發(fā)請求,使服務(wù)器過載,我們在download_coro 函數(shù)中創(chuàng)建一個asyncio.Semaphore 實例,然后把它傳給download_one 函數(shù)。

Semaphore

對象維護著一個內(nèi)部計數(shù)器,若在對象上調(diào)用 .acquire() 協(xié)程方法,計數(shù)器則遞減;若在對象上調(diào)用 .release() 協(xié)程方法,計數(shù)器則遞增。計數(shù)器的值是在初始化的時候設(shè)定。
如果計數(shù)器大于0,那么調(diào)用 .acquire() 方法不會阻塞,如果計數(shù)器為0, .acquire() 方法會阻塞調(diào)用這個方法的協(xié)程,直到其他協(xié)程在同一個 Semaphore 對象上調(diào)用 .release() 方法,讓計數(shù)器遞增。

在上邊的代碼中,我們并沒有手動調(diào)用 .acquire() 或 .release() 方法,而是在 download_one 函數(shù)中 把 semaphore 當做上下文管理器使用:

with (yield from semaphore):
    image = yield from get_flag(cc)

這段代碼保證,任何時候都不會有超過 MAX_CONCUR_REQ 個 get_flag 協(xié)程啟動。

使用 asyncio.as_completed 函數(shù)

因為要使用 yield from 獲取 asyncio.as_completed 函數(shù)產(chǎn)出的future的結(jié)果,所以 as_completed 函數(shù)秩序在協(xié)程中調(diào)用。由于 download_many 要作為參數(shù)傳給非協(xié)程的main 函數(shù),我已我們添加了一個新的 downloader_coro 協(xié)程,讓download_many 函數(shù)只用于設(shè)置事件循環(huán)。

使用Executor 對象,防止阻塞事件循環(huán)

現(xiàn)在我們回去看下上邊關(guān)于電腦從不同存儲介質(zhì)讀取數(shù)據(jù)的延遲情況圖,有一個實時需要注意,那就是訪問本地文件系統(tǒng)也會阻塞。
上邊的代碼中,save_flag 函數(shù)阻塞了客戶代碼與 asyncio 事件循環(huán)公用的唯一線程,因此保存文件時,整個應(yīng)用程序都會暫停。為了避免這個問題,可以使用事件循環(huán)對象的 run_in_executor 方法。

asyncio 的事件循環(huán)在后臺維護著一個ThreadPoolExecutor 對象,我們可以調(diào)用 run_in_executor 方法,把可調(diào)用的對象發(fā)給它執(zhí)行。
下邊是我們改動后的代碼:

@asyncio.coroutine
def download_one(cc, semaphore):
    try:
        with (yield from semaphore):
            image = yield from get_flag(cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = "not found"
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        # 這里是改動部分
        loop = asyncio.get_event_loop()  # 獲取事件循環(huán)的引用
        loop.run_in_executor(None, save_flag, image, cc.lower() + ".gif")
        status = HTTPStatus.ok
        msg = "ok"
    return Result(status, cc)

run_in_executor 方法的第一個參數(shù)是Executor 實例;如果設(shè)為None,使用事件循環(huán)的默認 ThreadPoolExecutor 實例。

從回調(diào)到future到協(xié)程

在接觸協(xié)程之前,我們可能對回調(diào)有一定的認識,那么和回調(diào)相比,協(xié)程有什么改進呢?

python中的回調(diào)代碼樣式:

def stage1(response1):
    request2 = step1(response1)
    api_call2(request2, stage2)
    
def stage2(response2):
    request3 = step3(response3)
    api_call3(request3, stage3)   

 def stage3(response3):
     step3(response3) 

api_call1(request1, stage1)

上邊的代碼的缺陷:

容易出現(xiàn)回調(diào)地獄

代碼難以閱讀

在這個問題上,協(xié)程能發(fā)揮很大的作用。如果換成協(xié)程和yield from 結(jié)果做的異步代碼,代碼示例如下:

@asyncio.coroutine
def three_stages(request1):
    response1 = yield from api_call1(request1)
    request2 = step1(response1)
    response2 = yield from api_call2(requests)
    request3 = step2(response2)
    response3 = yield from api_call3(requests)
    step3(response3)  
    
loop.create_task(three_stages(request1)

和之前的代碼相比,這個代碼就容易理解多了。如果異步調(diào)用 api_call1,api_call2,api_call3 會拋出異常,那么可以把相應(yīng)的 yield from 表達式放在 try/except 塊中處理異常。
使用協(xié)程必須習(xí)慣 yield from 表達式,并且協(xié)程不能直接調(diào)用,必須顯式的排定協(xié)程的執(zhí)行時間,或在其他排定了執(zhí)行時間的協(xié)程中使用yield from 表達式吧它激活。如果不使用 loop.create_task(three_stages(request1)),那么什么都不會發(fā)生。

下面我們用一個實際的例子來演示一下:

每次下載發(fā)起多次請求

我們修改一下上邊下載國旗的代碼,使在下載國旗的同時還可以獲取國家名稱在保存圖片的時候使用。
我們使用協(xié)程和yield from 解決這個問題:

@asyncio.coroutine
def http_get(url):
    resp = yield from aiohttp.request("GET", url)
    if resp.status == 200:
        ctype = resp.headers.get("Content-type", "").lower()
        if "json" in ctype or url.endswith("json"):
            data = yield from resp.json()
        else:
            data = yield from resp.read()
        return data
    elif resp.status == 404:
        raise web.HttpNotFound()
    else:
        raise aiohttp.HttpProcessionError(
            code=resp.status, message=resp.reason,
            headers=resp.headers)


@asyncio.coroutine
def get_country(cc):
    url = "{}/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
    metadata = yield from http_get(url)
    return metadata["country"]


@asyncio.coroutine
def get_flag(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    return (yield from http_get(url))


@asyncio.coroutine
def download_one(cc, semaphore):
    try:
        with (yield from semaphore):
            image = yield from get_flag(cc)
        with (yield from semaphore):
            country = yield from get_country(cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = "not found"
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        country = country.replace(" ", "_")
        filename = "{}--{}.gif".format(country, cc)
        print(filename)
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag, image, filename)
        status = HTTPStatus.ok
        msg = "ok"
    return Result(status, cc)

在這段代碼中,我們在download_one 函數(shù)中分別在 semaphore 控制的兩個with 塊中調(diào)用get_flag 和 get_country,是為了節(jié)約時間。

get_flag 的return 語句在外層加上括號,是因為() 的運算符優(yōu)先級高,會先執(zhí)行括號內(nèi)的yield from 語句 返回的結(jié)果。如果不加 會報句法錯誤
加() ,相當于

image = yield from http_get(url)
return image

如果不加(),那么程序會在 yield from 處中斷,交出控制權(quán),這時使用return 會報句法錯誤。

總結(jié)

這一篇我們討論了:

對比了一個多線程程序和asyncio版,說明了多線程和異步任務(wù)之間的關(guān)系

比較了 asyncio.Future 類 和 concurrent.futures.Future 類的區(qū)別

如何使用異步編程管理網(wǎng)絡(luò)應(yīng)用中的高并發(fā)

在異步編程中,與回調(diào)相比,協(xié)程顯著提升性能的方式

下一篇,我們將介紹如何使用asyncio包編寫服務(wù)器

參考鏈接

class asyncio.Semaphore

asyncio — Asynchronous I/O, event loop, coroutines and tasks

【譯】 Python 3.5 協(xié)程究竟是個啥

PEP 0492 Coroutines with async and await syntax

Python 之 asyncio

我所不能理解的Python中的Asyncio模塊

最后,感謝女朋友支持

>歡迎關(guān)注 >請我喝芬達
GPU云服務(wù)器 云服務(wù)器 python處理并發(fā) 并發(fā)處理 大數(shù)據(jù)并發(fā)處理 webrtc并發(fā)處理能力

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/38646.html

相關(guān)文章

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<