亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Python實現簡單多線程任務隊列

系統 1776 0

最近我在用梯度下降算法繪制神經網絡的數據時,遇到了一些算法性能的問題。梯度下降算法的代碼如下(偽代碼):

            
def gradient_descent():
  # the gradient descent code
  plotly.write(X, Y)


          

一般來說,當網絡請求 plot.ly 繪圖時會阻塞等待返回,于是也會影響到其他的梯度下降函數的執行速度。

一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分布式任務隊列)一樣大而全的任務隊列框架,因為框架對于我的這點需求來說太重了,并且我的繪圖也并不需要 redis 來持久化數據。

那用什么辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。

            
from threading import Thread
import Queue 
import time

class TaskQueue(Queue.Queue):


          

首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。

            
def __init__(self, num_workers=1):
  Queue.Queue.__init__(self)
  self.num_workers = num_workers
  self.start_workers()


          

初始化的時候,我們可以不用考慮工作線程的數量。

            
def add_task(self, task, *args, **kwargs):
  args = args or ()
  kwargs = kwargs or {}
  self.put((task, args, kwargs))


          

我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。

            
def start_workers(self):
  for i in range(self.num_workers):
    t = Thread(target=self.worker)
    t.daemon = True
    t.start()


          

我們為每個 worker 創建一個線程,然后在后臺刪除。

下面是 worker 函數的代碼:

            
def worker(self):
  while True:
    tupl = self.get()
    item, args, kwargs = self.get()
    item(*args, **kwargs) 
    self.task_done()


          

worker 函數獲取隊列頂端的任務,并根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:

我們可以通過下面的代碼測試:

            
def blokkah(*args, **kwargs):
  time.sleep(5)
  print “Blokkah mofo!”

q = TaskQueue(num_workers=5)

for item in range(1):
  q.add_task(blokkah)

q.join() # wait for all the tasks to finish.

print “All done!”


          

Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,并且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行數據庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣復雜的程序。

            
def gradient_descent():
  # the gradient descent code
  queue.add_task(plotly.write, x=X, y=Y)


          

修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。

            
from threading import Thread
import Queue
import time

class TaskQueue(Queue.Queue):

def __init__(self, num_workers=1):
Queue.Queue.__init__(self)
self.num_workers = num_workers
self.start_workers()

def add_task(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or {}
self.put((task, args, kwargs))

def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()

def worker(self):
while True:
tupl = self.get()
item, args, kwargs = self.get()
item(*args, **kwargs)
self.task_done()

def tests():
def blokkah(*args, **kwargs):
time.sleep(5)
print "Blokkah mofo!"

q = TaskQueue(num_workers=5)

for item in range(10):
q.add_task(blokkah)

q.join() # block until all tasks are done
print "All done!"

if __name__ == "__main__":
tests()

          


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 九九热国产 | 国内精品久久影视 | 黄色影院在线观看 | 在线观看中文字幕 | 久久国产精品自线拍免费 | 亚洲精品久久精品h成人 | 仑乱高清在线一级播放 | 在线观看中文字幕 | 91视频毛片 | 精品午夜国产在线观看不卡 | 成人欧美一区二区三区黑人 | 国产真实乱子伦精品 | 国产精品久久久亚洲第一牛牛 | 国产涩| 日韩福利影院 | 日韩精品一区二三区中文 | 国产精品午夜久久久久久99热 | 深夜网站在线观看 | 国产精品99r8在线观看 | 亚洲香蕉毛片久久网站老妇人 | 深夜福利国产精品亚洲尤物 | 欧美一级毛片视频 | 久久99精品久久久久久久野外 | 欧美色图一区二区 | 亚洲欧美精品国产一区色综合 | 免费超级淫片日本高清视频 | 超碰人人操 | 久久婷婷一区二区三区 | 日本日韩欧美 | 亚洲视频播放 | 久久福利免费视频 | 一a一级片 | 国产一毛片 | 手机看片高清国产日韩片 | 强制高潮18xxxxhd日韩 | 91福利视频网站 | 国产成人精品亚洲一区 | 亚洲国产精品热久久 | 日韩欧美理论 | 狠狠色噜噜狠狠狠狠97影音先锋 | 香蕉在线播放 |