亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

python多進程,進程池,數(shù)據(jù)共享,進程通信,分布式進程

系統(tǒng) 1757 0

一、操作系統(tǒng)中相關(guān)進程的知識

??Unix/Linux操作系統(tǒng)提供了一個fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù)調(diào)用,調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因為操作系統(tǒng)自動把當前進程(稱為父進程)復(fù)制了一份(稱為子進程),然后,分別在父進程和子進程內(nèi)返回。 ??子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調(diào)用getppid()就可以拿到父進程的ID。 ??Python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進程。

示例如下

          
            import os
pid=os.fork()
if pid==0:
    print('I am child process %s my parents is %s'%(os.getpid(),os.getppid()))
else:
    print('I (%s) just created a child process (%s).'%(os.getpid(),pid))
          
        

輸出如下

          
            I (64225) just created a child process (64226).
I am child process 64226 my parents is 64225
          
        

二、跨平臺模塊multiprocessing

multiprocessing 模塊提供了一個Process類來代表一個進程對象。
示例1

          
            from multiprocessing import Process
import os

# 子進程要執(zhí)行的代碼
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getppid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')
#join()方法可以等待子進程結(jié)束后再繼續(xù)往下運行,通常用于進程間的同步。  
          
        

示例2

          
            from multiprocessing import Process
import time
import os
class P(Process):
    def run(self):
        print('Run child process %s (%s)...'%(self.name,os.getpid()))  # 默認函數(shù)對象有name方法 ,結(jié)果為:P-1
        time.sleep(3)
        print('%s is done' % self.name)
if __name__ == '__main__':
    print('Parent process %s.' % os.getppid())
    p=P()
    p.start()
    p.join()
          
        

三、進程數(shù)據(jù)隔離

多個進程間的數(shù)據(jù)是隔離的,也就是說多個進程修改全局變量互不影響
驗證示例

          
            from multiprocessing import Process
import time
x=100
def task():
    global x
    print('子進程開啟,當前x的值為%d'%x)
    time.sleep(3)
    x=10
    print('子進程結(jié)束,當前x的值為%d'%x)

if __name__ == '__main__':
    print('當前為父進程,準備開啟子進程,x的值為%d' % x)
    p1=Process(target=task)
    p1.start()
    p1.join()
    print('當前為父進程,準備結(jié)束父進程,x的值為%d' % x)
          
        

輸出

          
            當前為父進程,準備開啟子進程,x的值為100
子進程開啟,當前x的值為100
子進程結(jié)束,當前x的值為10
當前為父進程,準備結(jié)束父進程,x的值為100
          
        

== 注意:有些情況是需要加鎖的情況,如文件讀寫問題 ==

四、多進程并行執(zhí)行

示例如下

          
            import time
from multiprocessing import Process

def task(name,n):
    print('%s is running'%name)
    time.sleep(n)
    print('%s is done'%name)

if __name__ == '__main__':
    p1=Process(target=task,args=("進程1",1)) #用時1s
    p2=Process(target=task,args=("進程2",2)) #用時1s
    p3=Process(target=task,args=("進程3",3)) #用時1s
    
    start_time=time.time()
    p1.start()
    p2.start()
    p3.start()
    # 當?shù)谝幻朐谶\行p1時,其實p2、p3也已經(jīng)在運行,當1s后到p2時只需要再運行1s就到p3了,到p3也是一樣。
    p1.join()
    p2.join()
    p3.join()
    stop_time=time.time()     
    print(stop_time-start_time) #3.2848567962646484
          
        

五、進程池

1、線性執(zhí)行( pool.apply() )

          
            from multiprocessing import Pool  # 導(dǎo)入進程池模塊pool
import time,os
def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印進程號
if __name__ == "__main__":
    pool = Pool(processes=5)   # 設(shè)置允許進程池同時放入5個進程
    for i in range(10):
        pool.apply(func=foo, args=(i,))   # 同步執(zhí)行掛起進程
    print('end')
    pool.close() # 關(guān)閉進程池,不再接受新進程
    pool.join()  # 進程池中進程執(zhí)行完畢后再關(guān)閉,如果注釋掉,那么程序直接關(guān)閉。
          
        

2、并發(fā)執(zhí)行( pool.apply_async() )

          
            from multiprocessing import Pool  # 導(dǎo)入進程池模塊pool
import time,os
def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印進程號
if __name__ == "__main__":
    pool = Pool(processes=5)   # 設(shè)置允許進程池同時放入5個進程,并且將這5個進程交給cpu去運行
    for i in range(10):
        pool.apply_async(func=foo, args=(i,))   # 采用異步方式執(zhí)行foo函數(shù)
    print('end')
    pool.close()
    pool.join()  # 進程池中進程執(zhí)行完畢后再關(guān)閉,如果注釋掉,那么程序直接關(guān)閉。
          
        

3、設(shè)置回調(diào)

          
            from multiprocessing import Process,Pool
import time,os
def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印子進程的進程號
def bar(arg):#注意arg參數(shù)是必須要有的
    print('-->exec done:', arg, os.getpid())   # 打印進程號
 
if __name__ == "__main__":
    pool = Pool(processes=2)
    print("主進程", os.getpid())   # 主進程的進程號
    for i in range(3):
        pool.apply_async(func=foo, args=(i,), callback=bar)   # 執(zhí)行回調(diào)函數(shù)callback=Bar
    print('end')
    pool.close()
    pool.join()  # 進程池中進程執(zhí)行完畢后再關(guān)閉,如果注釋掉,那么程序直接關(guān)閉。
          
        

執(zhí)行結(jié)果

          
            主進程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752
#回調(diào)函數(shù)說明fun=Foo干不完就不執(zhí)行bar函數(shù),等Foo執(zhí)行完就去執(zhí)行Bar
#這個回調(diào)函數(shù)是主進程去調(diào)用的,而不是每個子進程去調(diào)用的。
          
        

六、子進程

1、 很多時候子進程是一個外部進程,如執(zhí)行一條命令,這和命令行執(zhí)行效果是一樣的 ?? 示例如下

          
            import subprocess
print('$nslookup https://www.baidu.com')
r = subprocess.call(['nslookup','https://www.baidu.com'])
print('Exit code',r)
          
        

2、 有時候子進程還需要進行輸入,可以通過 communicate 方法來輸入 ?? 示例如下

          
            import subprocess
print('$ nslookup https://www.baidu.com')
p = subprocess.Popen(['nslookup'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
output,err = p.communicate(b'set q=mx\nbaidu.com\nexit\n')
print(output.decode('gbk'))
print('Exit code:',p.returncode)
          
        

輸出如下

          
            $ nslookup https://www.baidu.com
默認服務(wù)器:  bogon
Address:  192.168.111.1

> > 服務(wù)器:  bogon
Address:  192.168.111.1

baidu.com   MX preference = 10, mail exchanger = mx.maillb.baidu.com
baidu.com   MX preference = 20, mail exchanger = jpmx.baidu.com
baidu.com   MX preference = 15, mail exchanger = mx.n.shifen.com
baidu.com   MX preference = 20, mail exchanger = mx50.baidu.com
baidu.com   MX preference = 20, mail exchanger = mx1.baidu.com
> 
Exit code: 0
          
        

七、守護進程

守護進程在主進程代碼執(zhí)行完畢時立刻掛掉,然后主進程等待非守護進程執(zhí)行完畢后回收子進程的資源(避免產(chǎn)生僵尸進程),整體才算結(jié)束。
示例

          
            from multiprocessing import Process
import os
import time

def task(x):
    print('%s is running ' %x)
    time.sleep(3)
    print('%s is done' %x)

if __name__ == '__main__':
    p1=Process(target=task,args=('守護進程',))
    p2=Process(target=task,args=('子進程',))
    p2.start()
    p1.daemon=True   # 設(shè)置p1為守護進程
    p1.start()
    print('主進程代碼執(zhí)行完畢')

>>:主進程代碼執(zhí)行完畢
>>:子進程 is running
>>:子進程 is done
          
        

== 可以從結(jié)果看出,主進程代碼執(zhí)行完,守護進程立即掛掉,主進程在等待子進程執(zhí)行完畢后退出 ==

八、進程間通信

??如果想要進程間通信可以使用 Queue Pipe 來實現(xiàn) ?? 使用Queue示例

          
            from multiprocessing import Queue,Process
def put_id(q):
     q.put([1,2,3,4])
if __name__ == '__main__':
     q=Queue()
     p=Process(target=put_id,args=(q,))
     p.start()
     print(q.get())
     p.join()
# 輸出
[1,2,3,4]
          
        

== 注意:在這需要從multiprocessing導(dǎo)入Queue模塊 ==

使用Pipe示例

          
            from multiprocessing import Process,Pipe
def put_id(conn):
    conn.send([1,2,3])
    conn.send([4,5,6])
    conn.close()
    
if __name__ == '__main__':
    ## 生成管道。 生成時會產(chǎn)生兩個返回對象,這兩個對象相當于兩端的電話,通過管道線路連接。
    ## 兩個對象分別交給兩個變量。
    parent_conn,child_conn=Pipe()
    p=Process(target=put_id,args=(child_conn,))#child_conn需要傳給對端,用于send數(shù)據(jù)給parent_conn
    p.start()
    print(parent_conn.recv())  # parent_conn在這斷用于接收數(shù)據(jù)>>>>[1,2,3]
    print(parent_conn.recv())  # parent_conn在這斷用于接收數(shù)據(jù)>>>>[4,5,6]
    p.join()
          
        

== 注意兩端要發(fā)送次數(shù)和接受次數(shù)要對等,不然會卡住直到對等 ==

九、進程間數(shù)據(jù)共享(字典和列表型)

??前面說過,進程間數(shù)據(jù)是隔離的,如果想要進程間數(shù)據(jù)共享可以通過 Manager 來實現(xiàn) ?? 示例如下

          
            from multiprocessing import Manager,Process
from random import randint
import os
def run(d,l):
    d[randint(1,50)]=randint(51,100)#生成一個可在多個進程之間傳遞和共享的字典
    l.append(os.getpid())
    print(l)
if __name__ == '__main__':
    with Manager() as manage: #做一個別名,此時manager就相當于Manager()
        d=manage.dict()#生成一個可在多個進程之間傳遞和共享的字典
        l=manage.list(range(5))#生成一個可在多個進程之間傳遞和共享的列表
        p_list=[]
        for i in range(10):#生成10個進程
            p=Process(target=run,args=(d,l))
            p_list.append(p)# 將每個進程放入空列表中
            p.start()
        for i in p_list:
            i.join()
        print(d)#所有進程都執(zhí)行完畢后打印字典
        print(l)#所有進程都執(zhí)行完畢后打印列表
          
        

十、分布式進程

??在做分布式計算時顯然進程比線程各合適,一來進程更穩(wěn)定,二來線程最多只能在同一臺機器的多個cpu上運行; ?? multiprocessing managers 子模塊支持把多進程分布到多個機器上,一個服務(wù)進程用作調(diào)度者,依靠網(wǎng)絡(luò)將任務(wù)分布到其它多個進程中。 ??假設(shè)有一個需求,擁有兩臺機器,一臺機器用來做發(fā)送任務(wù)的服務(wù)進程,一臺用來做處理任務(wù)的服務(wù)進程; ?? 示例如下

          
            # task_master.py
from multiprocessing.managers import BaseManager
from queue import Queue
import random
import time

task_queue = Queue()
result_queue = Queue()

class QueueManager(BaseManager):
        pass

def get_task_queue():
    global task_queue
    return task_queue


def get_result_queue():
    global result_queue
    return result_queue


if __name__ == '__main__':
    # 將兩個隊列注冊到網(wǎng)絡(luò)上,calltable參數(shù)關(guān)聯(lián)Queue對象
    QueueManager.register('get_task_queue', callable=get_task_queue)
    QueueManager.register('get_result_queue', callable=get_result_queue)

    # 創(chuàng)建一個隊列管理器,綁定端口5000,設(shè)定密碼為abc
    manager = QueueManager(address=('127.0.0.1',5000),authkey=b'abc')
    manager.start()

    # 通過網(wǎng)絡(luò)獲取Queue對象
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    # 放任務(wù)進去
    for i in range(10):
        n = random.randint(0,1000)
        print('Put Task %d'%n)
        task.put(n)

    # 從結(jié)果隊列獲取結(jié)果
    print('Try get results')
    for i in range(10):
        r = result.get()
        print('Result: %s' % r)

    manager.shutdown()
    print('master exit')
          
        

== 注意:一定要用注冊過的Queue對象,另外在linux/unix/mac等系統(tǒng)上注冊可直接使用 QueueManager.register('get_result_queue', callable=lambda : result_queue) ==

          
            # task_worker.py
from multiprocessing.managers import BaseManager
from queue import Queue
from queue import Empty
import time

class QueueManager(BaseManager):
    pass

if __name__ == '__main__':
    # 從服務(wù)器上獲取,所以注冊時只需要提供名字,也就是接口名字
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    # 連接到服務(wù)器,也就是task_master.py的機器
    server_addr = '127.0.0.1'
    manager = QueueManager(address=(server_addr,5000),authkey=b'abc')
    manager.connect()

    # 獲取Queue對象
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    # 從隊列提取任務(wù),將處理結(jié)果插入result隊列
    for i in range(10):
        try:
            n = task.get(timeout=1)
            print('run task %d*%d'%(n,n))
            r = '%d * %d = %d'%(n,n,n*n)
            time.sleep(1)
            result.put(r)
        except Empty:
            print('task queue is empty')
    print('worker exit')
          
        

更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 99视频久久精品久久 | 破外女出血一级毛片 | 伊人伊狠亚洲综合影院 | 亚洲欧美日韩第一页 | 91视频这里只有精品 | 青草操| 成人国产精品一级毛片了 | 午夜色影院 | 男人都懂www深夜免费网站 | 波多野结衣中文字幕一区 | 国产成人精品亚洲2020 | 欧美亚洲国产成人精品 | 91国内精品视频 | 吃奶japanesevideo 处videossex第一次中 | 色综合天天综合中文网 | 国产午夜亚洲精品国产 | 在线亚洲国产精品区 | 精品理论片一区二区三区 | 91久久综合九色综合欧美98 | 97国产精品 | 一级毛片www | 亚洲综合激情视频 | 久久福利小视频 | 国产高清精品自在久久 | 亚洲午夜一区二区三区 | 国产大片中文字幕在线观看 | 国产片欧美片亚洲片久久综合 | 中文字幕在线永久 | 日本一区二区在线 | 四虎精品影院永久在线播放 | 亚洲在线小视频 | 久久se精品动漫一区二区三区 | 亚洲欧美精品中文字幕 | 亚州毛色毛片免费观看 | 国产理论自拍 | 最近中文字幕无吗免费视频 | 亚洲国产日韩在线一区 | a毛片免费播放全部完整 | 欧美18—19sex性护士 | 狠狠色丁香婷婷久久综合不卡 | 男人资源在线 |