在利用Python進(jìn)行系統(tǒng)管理的時(shí)候,特別是同時(shí)操作多個文件目錄,或者遠(yuǎn)程控制多臺主機(jī),并行操作可以節(jié)約大量的時(shí)間。當(dāng)被操作對象數(shù)目不大時(shí),可以直接利用multiprocessing中的Process動態(tài)成生多個進(jìn)程,10幾個還好,但如果是上百個,上千個目標(biāo),手動的去限制進(jìn)程數(shù)量卻又太過繁瑣,這時(shí)候進(jìn)程池Pool發(fā)揮作用的時(shí)候就到了。
????? Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時(shí),如果池還沒有滿,
那么就會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求; 但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,
那么該請求就會等待,直到池中有進(jìn)程結(jié)束,
才會創(chuàng)建新的進(jìn)程來它。 這里有一個簡單的例子:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
? ?
?
? ?
? ?
|
?
先創(chuàng)建容量為3的進(jìn)程池,然后將f(i)依次傳遞給它,運(yùn)行腳本后利用ps aux | grep pool.py查看進(jìn)程情況,會發(fā)現(xiàn)最多只會有三個進(jìn)程執(zhí)行。pool.apply_async()用來向進(jìn)程池提交目標(biāo)請求,pool.join()是用來等待進(jìn)程池中的worker進(jìn)程執(zhí)行完畢,防止主進(jìn)程在worker進(jìn)程結(jié)束前結(jié)束。但必pool.join()必須使用在pool.close()或者pool.terminate()之后。其中close()跟terminate()的區(qū)別在于close()會等待池中的worker進(jìn)程執(zhí)行結(jié)束再關(guān)閉pool,而terminate()則是直接關(guān)閉。result.successful()表示整個調(diào)用執(zhí)行的狀態(tài),如果還有worker沒有執(zhí)行完,則會拋出AssertionError異常。
????利用multiprocessing下的Pool可以很方便的同時(shí)自動處理幾百或者上千個并行操作,腳本的復(fù)雜性也大大降低.
?
python中multiprocessing.pool函數(shù)介紹
?
一?apply(func[, args[, kwds]])
???apply用于傳遞不定參數(shù),同python中的apply函數(shù)一致(不過內(nèi)置的apply函數(shù)從2.3以后就不建議使用了),主進(jìn)程會阻塞于函數(shù)。
for x in gen_list(l):
????result = pool.apply(pool_test, (x,))
????print 'main process'
這個時(shí)候主進(jìn)程的執(zhí)行流程同單進(jìn)程一致
二?apply_async(func[, args[, kwds[, callback]]])
???與apply用法一致,但它是非阻塞的且支持結(jié)果返回后進(jìn)行回調(diào)。
for x in gen_list(l):
????result = pool.apply_async(pool_test, (x,))
????print 'main process'
???這個時(shí)候主進(jìn)程循環(huán)運(yùn)行過程中不等待apply_async的返回結(jié)果,在主進(jìn)程結(jié)束后,即使子進(jìn)程還未返回整個程序也會就退出。雖然 apply_async是非阻塞的,但其返回結(jié)果的get方法卻是阻塞的,在本例中result.get()會阻塞主進(jìn)程。因此可以這樣來處理返回結(jié)果:
????[x.get() for x in [pool.apply_async(pool_test, (x,)) for x in gen_list(l)]]
如果我們對返回結(jié)果不感興趣, 那么可以在主進(jìn)程中使用pool.close與pool.join來防止主進(jìn)程退出。注意join方法一定要在close或terminate之后調(diào)用。
????for x in gen_list(l):
????pool.apply_async(pool_test, (x, ))
????print 'main_process'
????pool.close()
????pool.join()
三?map(func, iterable[, chunksize])
???
map方法與內(nèi)置的map函數(shù)行為基本一致,在它會使進(jìn)程阻塞與此直到結(jié)果返回。
???但需注意的是其第二個參數(shù)雖然描述的為iterable, 但在實(shí)際使用中發(fā)現(xiàn)只有在整個隊(duì)列全部就緒后,程序才會運(yùn)行子進(jìn)程。
四?map_async(func, iterable[, chunksize[, callback]])
???與map用法一致,但是它是非阻塞的。其有關(guān)事項(xiàng)見apply_async。
五?imap(func, iterable[, chunksize])
???與map不同的是, imap的返回結(jié)果為iter,需要在主進(jìn)程中主動使用next來驅(qū)動子進(jìn)程的調(diào)用。即使子進(jìn)程沒有返回結(jié)果,主進(jìn)程對于gen_list(l)的 iter還是會繼續(xù)進(jìn)行, 另外根據(jù)python2.6文檔的描述,對于大數(shù)據(jù)量的iterable而言,將chunksize設(shè)置大一些比默認(rèn)的1要好。
???for x in pool.imap(pool_test, gen_list(l)):
???????pass
六?imap_unordered(func, iterable[, chunksize])
???同imap一致,只不過其并不保證返回結(jié)果與迭代傳入的順序一致。
七?close()
???關(guān)閉pool,使其不在接受新的任務(wù)。
八?terminate()
???結(jié)束工作進(jìn)程,不在處理未處理的任務(wù)。
九?join()
???主進(jìn)程阻塞等待子進(jìn)程的退出, join方法要在close或terminate之后使用。
?
l = range(10)
def gen_list(l):
????for x in l:
????????print 'yield', x
????????yield x
def pool_test(x):
????print 'f2', x
????time.sleep(1)
?
我可以輸,但我不會認(rèn)輸
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
