亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

python異步實(shí)現(xiàn)定時(shí)任務(wù)和周期任務(wù)的方法

系統(tǒng) 1756 0

一. 如何調(diào)用

            
def f1(arg1, arg2):
  print('f1', arg1, arg2)
 
 
def f2(arg1):
  print('f2', arg1)
 
 
def f3():
  print('f3')
 
 
def f4():
  print('周期任務(wù)', int(time.time()))
 
 
timer = TaskTimer()
# 把任務(wù)加入任務(wù)隊(duì)列
timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30執(zhí)行
timer.join_task(f2, [3], timing=14) # 每天14:00執(zhí)行
timer.join_task(f3, [], timing=15) # 每天15:00執(zhí)行
timer.join_task(f4, [], interval=10) # 每10秒執(zhí)行1次
# 開始執(zhí)行(此時(shí)才會(huì)創(chuàng)建線程)
timer.start()

          

f1~f4是我們需要定時(shí)執(zhí)行的函數(shù)。

首先創(chuàng)建TaskTimer對(duì)象( TaskTimer的代碼在下面 )。調(diào)用join_task函數(shù),把需要執(zhí)行的函數(shù)加入到任務(wù)隊(duì)列。最后調(diào)用start,任務(wù)就開始執(zhí)行了。

join_task參數(shù):

fun:需要執(zhí)行的函數(shù)

arg:fun的參數(shù),如果沒有就傳一個(gè)空列表

interval:如果有此參數(shù),說明任務(wù)是周期任務(wù),單位為秒(注意interval最少5秒)

timing:如果有此參數(shù),說明任務(wù)是定時(shí)任務(wù),單位為時(shí)

注意:interval和timing只能選填1個(gè)

二. 源碼

            
import datetime
import time
from threading import Thread
from time import sleep
 
 
class TaskTimer:
  __instance = None
 
  def __new__(cls, *args, **kwargs):
    """
    單例模式
    """
    if not cls.__instance:
      cls.__instance = object.__new__(cls)
    return cls.__instance
 
  def __init__(self):
    if not hasattr(self, 'task_queue'):
      setattr(self, 'task_queue', [])
 
    if not hasattr(self, 'is_running'):
      setattr(self, 'is_running', False)
 
  def write_log(self, level, msg):
    cur_time = datetime.datetime.now()
    with open('./task.log', mode='a+', encoding='utf8') as file:
      s = "[" + str(cur_time) + "][" + level + "]  " + msg
      print(s)
      file.write(s + "\n")
 
  def work(self):
 
    """
    處理任務(wù)隊(duì)列
    """
    while True:
      for task in self.task_queue:
        if task['interval']:
          self.cycle_task(task)
        elif task['timing']:
          self.timing_task(task)
 
      sleep(5)
 
  def cycle_task(self, task):
    """
    周期任務(wù)
    """
    if task['next_sec'] <= int(time.time()):
      try:
        task['fun'](*task['arg'])
        self.write_log("正常", "周期任務(wù):" + task['fun'].__name__ + " 已執(zhí)行")
      except Exception as e:
        self.write_log("異常", "周期任務(wù):" + task['fun'].__name__ + " 函數(shù)內(nèi)部異常:" + str(e))
      finally:
        task['next_sec'] = int(time.time()) + task['interval']
 
  def timing_task(self, task):
    """
    定時(shí)任務(wù)
    """
    # 今天已過秒數(shù)
    today_sec = self.get_today_until_now()
 
    # 到了第二天,就重置任務(wù)狀態(tài)
    if task['today'] != self.get_today():
      task['today'] = self.get_today()
      task['today_done'] = False
 
    # 第一次執(zhí)行
    if task['first_work']:
      if today_sec >= task['task_sec']:
        task['today_done'] = True
        task['first_work'] = False
      else:
        task['first_work'] = False
 
    # 今天還沒有執(zhí)行
    if not task['today_done']:
      if today_sec >= task['task_sec']: # 到點(diǎn)了,開始執(zhí)行任務(wù)
        try:
          task['fun'](*task['arg'])
          self.write_log("正常", "定時(shí)任務(wù):" + task['fun'].__name__ + " 已執(zhí)行")
        except Exception as e:
          self.write_log("異常", "定時(shí)任務(wù):" + task['fun'].__name__ + " 函數(shù)內(nèi)部異常:" + str(e))
        finally:
          task['today_done'] = True
          if task['first_work']:
            task['first_work'] = False
 
  def get_today_until_now(self):
    """
    獲取今天凌晨到現(xiàn)在的秒數(shù)
    """
    i = datetime.datetime.now()
    return i.hour * 3600 + i.minute * 60 + i.second
 
  def get_today(self):
    """
    獲取今天的日期
    """
    i = datetime.datetime.now()
    return i.day
 
  def join_task(self, fun, arg, interval=None, timing=None):
    """
    interval和timing只能存在1個(gè)
    :param fun: 你要調(diào)用的任務(wù)
    :param arg: fun的參數(shù)
    :param interval: 周期任務(wù),單位秒
    :param timing: 定時(shí)任務(wù),取值:[0,24)
    """
    # 參數(shù)校驗(yàn)
    if (interval != None and timing != None) or (interval == None and timing == None):
      raise Exception('interval和timing只能選填1個(gè)')
 
    if timing and not 0 <= timing < 24:
      raise Exception('timing的取值范圍為[0,24)')
 
    if interval and interval < 5:
      raise Exception('interval最少為5')
 
    # 封裝一個(gè)task
    task = {
      'fun': fun,
      'arg': arg,
      'interval': interval,
      'timing': timing,
    }
    # 封裝周期或定時(shí)任務(wù)相應(yīng)的參數(shù)
    if timing:
      task['task_sec'] = timing * 3600
      task['today_done'] = False
      task['first_work'] = True
      task['today'] = self.get_today()
    elif interval:
      task['next_sec'] = int(time.time()) + interval
 
    # 把task加入任務(wù)隊(duì)列
    self.task_queue.append(task)
 
    self.write_log("正常", "新增任務(wù):" + fun.__name__)
 
  def start(self):
    """
    開始執(zhí)行任務(wù)
    返回線程標(biāo)識(shí)符
    """
    if not self.is_running:
      thread = Thread(target=self.work)
 
      thread.start()
 
      self.is_running = True
 
      self.write_log("正常", "TaskTimer已開始運(yùn)行!")
 
      return thread.ident
 
    self.write_log("警告", "TaskTimer已運(yùn)行,請勿重復(fù)啟動(dòng)!")

          

以上這篇python異步實(shí)現(xiàn)定時(shí)任務(wù)和周期任務(wù)的方法就是小編分享給大家的全部內(nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 日本一级毛片视频无遮挡免费 | 91日本视频 | 内衣办公室动漫久久影院 | 在线观看片成人免费视频 | 国产高清在线精品一区二区三区 | 亚洲日本人成中文字幕 | 亚洲精品午夜级久久久久 | 黄色网欧美 | 人人爱人人草 | 成人在线视频免费观看 | 免费人成激情视频在线观看冫 | 在线小视频国产 | 久久国产热这里只有精品8 久久国产三级 | 亚洲精品人成无码中文毛片 | 五月天久久婷婷 | 久久亚洲精品久久久久 | 亚洲综合色丁香麻豆 | 在线免费黄色网址 | 色婷婷精品大全在线视频 | 久久综合一区二区三区 | 欧美成人性做爰网站免费 | 国产啪爱视频精品免视 | 夜夜女人国产香蕉久久精品 | 国产精品剧情原创麻豆国产 | 伊人久久国产精品 | 狠狠躁夜夜躁人人爽天天miya | 久久草在线观看 | 亚洲国产精品二区久久 | 奇米在线影视 | 波多野结衣免费播放 | 操穴网 | 国产亚洲欧美日韩在线看片 | 色偷偷尼玛图亚洲综合 | 久久精品国产精品亚洲艾 | 久久久麻豆 | 伊人色综 | 亚洲国产精品乱码一区二区三区 | 日本狠狠操 | 五月天在线免费视频 | 久青草国产在视频在线观看 | 久久精品精品 |