摘要:上一篇文章進(jìn)程專題繼承來創(chuàng)建進(jìn)程下一篇文章進(jìn)程專題進(jìn)程間通信當(dāng)我們需要?jiǎng)?chuàng)建大量的進(jìn)程時(shí),利用模塊提供的來創(chuàng)建進(jìn)程。關(guān)閉進(jìn)程池,不再接受進(jìn)的進(jìn)程請求,但已經(jīng)接受的進(jìn)程還是會(huì)繼續(xù)執(zhí)行。
上一篇文章:Python進(jìn)程專題3:繼承Process來創(chuàng)建進(jìn)程
下一篇文章:Python進(jìn)程專題5:進(jìn)程間通信
當(dāng)我們需要?jiǎng)?chuàng)建大量的進(jìn)程時(shí),利用multiprocessing模塊提供的Pool來創(chuàng)建進(jìn)程。
進(jìn)程初始化時(shí),會(huì)指定一個(gè)最大進(jìn)程數(shù)量,當(dāng)有新的請求需要?jiǎng)?chuàng)建進(jìn)程時(shí),如果此時(shí)進(jìn)程池還沒有到達(dá)設(shè)置的最大進(jìn)程數(shù),該進(jìn)程池就會(huì)創(chuàng)建新的進(jìn)程來處理該請求,并把該進(jìn)程放到進(jìn)程池中,如果進(jìn)程池已經(jīng)達(dá)到最大數(shù)量,請求就會(huì)等待,知道進(jìn)程池中進(jìn)程數(shù)量減少,才會(huì)新建進(jìn)程來執(zhí)行請求。
pool=Pool(numprocess,initializer,initargs) numproxess:需要?jiǎng)?chuàng)建的進(jìn)程個(gè)數(shù),如果忽略將使用cpu_count()的值。即系統(tǒng)上的CPU數(shù)量。 initializer:每個(gè)進(jìn)程啟動(dòng)時(shí)都要調(diào)用的對象。 initargs:為 initalizer傳遞的參數(shù)。
multiprocessing.Pool常用函數(shù)解析:
apply_async(要調(diào)用的方法,參數(shù)列表,關(guān)鍵字參數(shù)列表):使用非阻塞方式調(diào)用指定方法,并行執(zhí)行(同時(shí)執(zhí)行) apply(要調(diào)用的方法,參數(shù)列表,關(guān)鍵字參數(shù)列表):使用阻塞方式調(diào)用指定方法,,阻塞方式就是要等上一個(gè)進(jìn)程退出后,下一個(gè)進(jìn)程才開始運(yùn)行。 close():關(guān)閉進(jìn)程池,不再接受進(jìn)的進(jìn)程請求,但已經(jīng)接受的進(jìn)程還是會(huì)繼續(xù)執(zhí)行。 terminate():不管程任務(wù)是否完成,立即結(jié)束。 join():主進(jìn)程堵塞(就是不執(zhí)行join下面的語句),直到子進(jìn)程結(jié)束,注意,該方法必須在close或terminate之后使用。 pool.map(func,iterable,chunksize):將可調(diào)用對象func應(yīng)用給iterable的每一項(xiàng),然后以列表形式返回結(jié)果, 通過將iterable劃分為多塊,并分配給工作進(jìn)程,可以并行執(zhí)行。chunksize指定每塊中的項(xiàng)數(shù), 如果數(shù)據(jù)量較大,可以增大chunksize的值來提升性能。 pool.map_async(func,iterable,chunksize,callback):與map方法不同之處是返回結(jié)果是異步的, 如果callback指定,當(dāng)結(jié)果可用時(shí),結(jié)果會(huì)調(diào)用callback。 pool.imap(func,iterable,chunksize):與map()方法的不同之處是返回迭代器而非列表。 pool.imap_unordered(func,iterable,chunksize):與imap()不同之處是:結(jié)果的順序是根據(jù)從工作進(jìn)程接收到的時(shí)間而定的。 pool.get(timeout):如果沒有設(shè)置timeout,將會(huì)一直等待結(jié)果, 如果設(shè)置了timeout,超過timeout將引發(fā)multiprocessing.TimeoutError異常。 pool.ready():如果調(diào)用完成,返回True pool.successful():如果調(diào)用完成并且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用,jiang引發(fā)AssertionError異常。 pool.wait(timeout):等待結(jié)果變?yōu)榭捎茫瑃imeout為等待時(shí)間。
#阻塞與非阻塞對比 from multiprocessing import Pool import os import time import random#用來生成隨機(jī)數(shù) def test1(name): print("%s運(yùn)行中,pid=%d,父進(jìn)程:%d"%(name,os.getpid(),os.getppid())) t_start=time.time() #random.random()會(huì)生成一個(gè)0——1的浮點(diǎn)數(shù) time.sleep(random.random()*3) t_end=time.time() print("%s執(zhí)行時(shí)間:%0.2f秒"%(name,t_end-t_start)) pool=Pool(5)#設(shè)置線程池中最大線程數(shù)量為5 for xx in range(0,7): #非阻塞運(yùn)行 pool.apply_async(test1,("mark"+str(id),)) print("--start1--") pool.close()#關(guān)閉線程池,關(guān)閉后不再接受進(jìn)的請求 pool.join()#等待進(jìn)程池所有進(jìn)程都執(zhí)行完畢后,開始執(zhí)行下面語句 print("--end1--") print("*"*30) pool=Pool(5)#設(shè)置線程池中最大線程數(shù)量為5 for xx in range(0,7): #阻塞運(yùn)行 pool.apply(test1,("mark"+str(id),)) print("--start2--") pool.close()#關(guān)閉線程池,關(guān)閉后不再接受進(jìn)的請求 pool.join()#等待進(jìn)程池所有進(jìn)程都執(zhí)行完畢后,開始執(zhí)行下面語句 print("--end2--")
結(jié)果:
--start1-- mark運(yùn)行中,pid=28631,父進(jìn)程:28626 mark 運(yùn)行中,pid=28632,父進(jìn)程:28626 mark 運(yùn)行中,pid=28633,父進(jìn)程:28626 mark 運(yùn)行中,pid=28634,父進(jìn)程:28626 mark 運(yùn)行中,pid=28636,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:0.27秒 mark 運(yùn)行中,pid=28633,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:0.32秒 mark 運(yùn)行中,pid=28634,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:0.18秒 mark 執(zhí)行時(shí)間:0.55秒 mark 執(zhí)行時(shí)間:1.78秒 mark 執(zhí)行時(shí)間:1.92秒 mark 執(zhí)行時(shí)間:2.71秒 --end1-- ****************************** mark 運(yùn)行中,pid=28647,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:0.70秒 mark 運(yùn)行中,pid=28648,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:1.66秒 mark 運(yùn)行中,pid=28649,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:2.87秒 mark 運(yùn)行中,pid=28650,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:2.68秒 mark 運(yùn)行中,pid=28651,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:1.42秒 mark 運(yùn)行中,pid=28647,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:1.20秒 mark 運(yùn)行中,pid=28648,父進(jìn)程:28626 mark 執(zhí)行時(shí)間:2.01秒 --start2-- --end2--
查看下面實(shí)例前,先來熟悉一下我們需要用到的一些知識。os.walk(top,topdown=true,onerrorNone,followlinks=false):遍歷目錄地址,返回一個(gè)三元組(root,dirs,files)
top:想要遍歷的目錄
root:當(dāng)前正在遍歷的文件夾地址。
dirs:是一個(gè)list:當(dāng)前文件夾下所有目錄的名字(不包括子目錄)
files:也是一個(gè)list,當(dāng)前文件夾下所有文件的名字(不包括子目錄文件)
topdown:默認(rèn)為True:優(yōu)先遍歷top目錄,為false會(huì)優(yōu)先遍歷top的子目錄。
onerror:指定一個(gè)當(dāng)方法執(zhí)行異常時(shí)需要調(diào)用的方法。
followlinks:默認(rèn)為True:會(huì)遍歷目錄環(huán)境下的快捷方式實(shí)際指向目錄。
代碼:
#遍歷目錄文件并求取SHA512的摘要值 import os import multiprocessing import hashlib #定義進(jìn)程大小 POOLSIZE=2 #工作進(jìn)程的數(shù)量 #可以讀取的緩沖區(qū)大小 BUFSIZE=8196 def mark(filename): try: f=open(filename,"rb") except IOError: return None digest=hashlib.sha512() while True: chunk=f.read(BUFSIZE) if not chunk:break digest.update(chunk) f.close() return filename,digest.digest() def build_map(dir): #定義進(jìn)程 pool=multiprocessing.Pool(POOLSIZE) #os.path.join:拼接文件路徑 #根據(jù)文件目錄和名稱拼接 all_files=(os.path.join(root,name) #循環(huán)遍歷當(dāng)前目錄 for root,dirs,files in os.walk(dir) #取出當(dāng)前目錄下文件名 for name in files) #dict方法用于將結(jié)果轉(zhuǎn)化成字典 map=dict(pool.imap_unordered(mark,all_files,20)) pool.close() return map if __name__=="__main__": digest_map=build_map("/Users/zhaolixiang/Desktop/python/test1/進(jìn)程") for item in digest_map: print("文件目錄:",item) print("SHA512摘要值:",digest_map[item]) #個(gè)數(shù) print(len(digest_map))
結(jié)果
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/1、fork創(chuàng)建子線程.py
SHA512摘要值: b"x9eYLSxe5xb6xd2xecxd4&xa9xff~m?x87xd2Nxea39Gxe1x8fx9cdx83@x06/Bnxddx1exb5xe0jx10xd1xc9=xfd;y]x8dnR)xbdtxb8xc8xb46rExf8xd7t.xaexbbxe9"
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/8、JoinableQueue的生產(chǎn)者與消費(fèi)者.py
SHA512摘要值: b"xfe4x18xee8xd1x97xe7vx86}xe6qnx13xf9Dxf1Xxe3xabx94xebx96xfbxedWx0bOn$x14d/+rx9bx0bxc1xd4x03xadxcbbxf7x8dxd5Ccx03yxd1xb4x801,?,rIXxa28xd7"
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/10、管道:返回?cái)?shù)據(jù).py
SHA512摘要值: b"x85xb1Cxe56x80x1dsx84txf6x95xcbx1d[xdaxe4}n)Y
SHA512摘要值: b"bBxf4lxccxd5xaetrxc1x816xe5xfc;tx85x86xe5xd3x9e~SH]xe6xcbdxc9xfexe9xfbxfcxeb)Axd2ox8cOxbcx1etxe9xe92^xb5x10xe1xf4xc9\_x0cx8cxa7qxf4(x13Mxbe7"
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/2、multiprocessing.py
SHA512摘要值: b"mxb2xb5x00M1=qx86xb9Gxf9x02xb1x18xfax08xe7,.xefxff5xebt,nx17xbfB-xc6xc2x9dVxf9x88xa7xd8x1ex9b)x86Nxd5xab x89xa0J}xa9xdcxbcx06mx03xa7x87.x17xcb"x93"
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/6、驗(yàn)證:put方法會(huì)阻塞.py
SHA512摘要值: b"xa3x80x86xf6xc9xb8x0eOB 5xd0x949xfdxc2yx9axc7xcax9dxbc%xa1xe6_xfax84xb6x02$xf7x10xbbxb9Nxfbxdexc8xbe6F xd6x87xacxb2xf5x94x0cxedxe0.xf9Txdax91`;xd7x90x86"
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/7、closse方法、get方法、put方法簡單使用.py
SHA512摘要值: b"lx98wx8fTx15xdfTx19x91^:xc5xa3x8dxf4x1ex9cx91Exe4xe7xbfxecVx1ex1bx19xa0ix96Lxc6xc4r.x9bxecx88xe9rxfeOx8bxdfAx90x7f6?xe7x1d8xa6Nx07xdex8dxb6xe7#x02p"
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/5、進(jìn)程池.py
SHA512摘要值: b"xc2xf75x04xbaxa5Xxccx81x88xffxbazxd81@x0bQJ%xbbx15xf3`Qx9a4}xc0x07xa2&axc0x00x0c%xb4[xd2ex18x04x14ytx0cxb0xacx1dxf5xfbxe0xc7xb6$xd2xf1xd9sePx08"
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/3、multiprocessing.py
SHA512摘要值: b"x04N4Dxd5xab}xcfx03xe6x0fVx0fx910x91xe4x81,xbb-xdfxb36Ixf9!x84Axdf.xf3HVxfdx86:x0bx81<+ex00xd1x17uxf5hxb34FxfexebFxf5x1bxc3x8d`Axa0Ax02x10"
文件目錄: /Users/zhaolixiang/Desktop/python/test1/進(jìn)程/9、管道.py
SHA512摘要值: b"`xdbx8dx90Vx04=x0cxf9x9cxf7{x8fxcax9fxccxb8Dx97xecx82(xd4x9ax84xdb
SHA512摘要值: b"2xc1xa0x1fxd7xd6xb2x1c}x14xdedx8fxdbxedxd0x91mxc1,xb9xdd?TbXx04#2xfcxb8xbfurxabxfcFcx17x18xc7)sYx82x0exea{5x87xf3x8fcxbaPx91r0xefxabLxa8x1ex15"
11
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/42357.html
摘要:上一篇文章進(jìn)程專題創(chuàng)建進(jìn)程下一篇文章進(jìn)程專題進(jìn)程池實(shí)例重新方法下面一句是調(diào)用父類方法,這一本盡量不要少,因?yàn)楦割愡€有很多事情需要在方法內(nèi)處理重寫方法子進(jìn)程運(yùn)行中,,父進(jìn)程子進(jìn)程運(yùn)行結(jié)束,耗時(shí)秒父進(jìn)程開始執(zhí)行父進(jìn)程運(yùn)行結(jié)束,耗時(shí)秒結(jié)果父進(jìn) 上一篇文章:Python進(jìn)程專題2:multiprocessing創(chuàng)建進(jìn)程下一篇文章:Python進(jìn)程專題4:進(jìn)程池Pool 實(shí)例: from mu...
摘要:上一篇文章進(jìn)程專題進(jìn)程池下一篇文章進(jìn)程專題共享數(shù)據(jù)與同步模塊支持的進(jìn)程間通信主要有兩種管道和隊(duì)列。隊(duì)列底層使用管道和鎖,同時(shí)運(yùn)行支持線程講隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐校瑏韺?shí)習(xí)進(jìn)程間通信。 上一篇文章:Python進(jìn)程專題4:進(jìn)程池Pool下一篇文章:Python進(jìn)程專題6:共享數(shù)據(jù)與同步 multiprocessing模塊支持的進(jìn)程間通信主要有兩種:管道和隊(duì)列。一般來說,發(fā)送較少的大...
摘要:限制同時(shí)運(yùn)行線程數(shù)使用類就行,在內(nèi)部管理著一個(gè)計(jì)數(shù)器。當(dāng)計(jì)數(shù)器到時(shí),再調(diào)用就會(huì)阻塞,直到其他線程來調(diào)用,這樣就限制了同時(shí)運(yùn)行線程的數(shù)量。 事前最好了解一下什么是進(jìn)程,什么是線程,什么是GIL,本文不再贅述,直接介紹模塊的使用: 推薦1,推薦2,推薦3,更多自尋 普通的python爬蟲是單進(jìn)程單線程的,這樣在遇到大量重復(fù)的操作時(shí)就只能逐個(gè)進(jìn)行,我們就很難過了。舉個(gè)栗子:你有1000個(gè)...
摘要:目前開發(fā)中有遇到進(jìn)程間需要共享數(shù)據(jù)的情況所以研究了下主要會(huì)以為例子說明下進(jìn)程間共享同一個(gè)父進(jìn)程使用說明創(chuàng)建一個(gè)對象創(chuàng)建一個(gè)創(chuàng)建一個(gè)測試程序創(chuàng)建進(jìn)程池進(jìn)行測試簡單的源碼分析這時(shí)我們再看一個(gè)例子創(chuàng)建一個(gè)對象創(chuàng)建一個(gè)創(chuàng)建一個(gè)測試程序創(chuàng)建進(jìn)程池進(jìn)行 目前開發(fā)中有遇到進(jìn)程間需要共享數(shù)據(jù)的情況. 所以研究了下multiprocessing.Manager, 主要會(huì)以dict為例子, 說明下進(jìn)程間共...
摘要:解決方法有兩種。代碼然而這段代碼只有在運(yùn)行在處的時(shí)候才能用中斷,即前你按有效,一旦后則完全無效建議先確認(rèn)是否真的需要用到多進(jìn)程,如果是多的程序建議用多線程或協(xié)程,計(jì)算特別多則用多進(jìn)程。 本文理論上對multiprocessing.dummy的Pool同樣有效。 python2.x中multiprocessing提供的基于函數(shù)進(jìn)程池,join后陷入內(nèi)核態(tài),按下ctrl+c不能停止所有的進(jìn)...
閱讀 3085·2021-10-27 14:15
閱讀 3085·2021-09-07 10:18
閱讀 1391·2019-08-30 15:53
閱讀 1653·2019-08-26 18:18
閱讀 3442·2019-08-26 12:15
閱讀 3523·2019-08-26 10:43
閱讀 719·2019-08-23 16:43
閱讀 2287·2019-08-23 15:27