亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Python定時任務(wù)工具之APScheduler使用方式

系統(tǒng) 1779 0

APScheduler (advanceded python scheduler)是一款Python開發(fā)的定時任務(wù)工具。

文檔地址?apscheduler.readthedocs.io/en/latest/u…

特點:

  • 不依賴于Linux系統(tǒng)的crontab系統(tǒng)定時,獨立運行
  • 可以?動態(tài)添加 新的定時任務(wù),如下單后30分鐘內(nèi)必須支付,否則取消訂單,就可以借助此工具(每下一單就要添加此訂單的定時任務(wù))
  • 對添加的定時任務(wù)可以做持久保存

1 安裝

pip install apscheduler

2 組成

  • APScheduler 由以下四部分組成:
  • triggers 觸發(fā)器 指定定時任務(wù)執(zhí)行的時機(jī)
  • job stores 存儲器 可以將定時持久存儲
  • executors 執(zhí)行器 在定時任務(wù)該執(zhí)行時,以進(jìn)程或線程方式執(zhí)行任務(wù)
  • schedulers 調(diào)度器 常用的有BackgroundScheduler(?后臺運行 )和BlockingScheduler(?阻塞式 )

3 使用方式

            
from apscheduler.schedulers.background import BlockingScheduler
#
 創(chuàng)建定時任務(wù)的調(diào)度器對象
scheduler = BlockingScheduler()
# 創(chuàng)建執(zhí)行器
executors = {
 'default': ThreadPoolExecutor(20),
}
# 定義定時任務(wù)
def my_job(param1, param2): # 參數(shù)通過add_job()args傳遞傳遞過來
 print(param1) # 100
 print(param2) # python
# 向調(diào)度器中添加定時任務(wù)
scheduler.add_job(my_job, 'date', args=[100, 'python'], executors=executors)
# 啟動定時任務(wù)調(diào)度器工作
scheduler.start()
          


4 調(diào)度器 Scheduler

負(fù)責(zé)管理定時任務(wù)

BlockingScheduler : 作為獨立進(jìn)程時使用

            
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start() # 此處程序會發(fā)生阻塞
BackgroundScheduler : 在框架程序(如Django、Flask)中使用.
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # 此處程序不會發(fā)生阻塞
          

  • AsyncIOScheduler : 當(dāng)你的程序使用了asyncio的時候使用。
  • GeventScheduler : 當(dāng)你的程序使用了gevent的時候使用。
  • TornadoScheduler : 當(dāng)你的程序基于Tornado的時候使用。
  • TwistedScheduler : 當(dāng)你的程序使用了Twisted的時候使用
  • QtScheduler : 如果你的應(yīng)用是一個Qt應(yīng)用的時候可以使用。

4 執(zhí)行器 executors

在定時任務(wù)該執(zhí)行時,以進(jìn)程或線程方式執(zhí)行任務(wù)

            
ThreadPoolExecutor
from apscheduler.executors.pool import ThreadPoolExecutor
ThreadPoolExecutor(max_workers) 

          

使用方法

            
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
  'default': ThreadPoolExecutor(20) # 最多20個線程同時執(zhí)行
 }
 scheduler = BackgroundScheduler(executors=executors)
ProcessPoolExecutor

from apscheduler.executors.pool import ProcessPoolExecutor
ProcessPoolExecutor(max_workers)
          

使用方法

            
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
  'default': ProcessPoolExecutor(5) # 最多5個進(jìn)程同時執(zhí)行
 }
scheduler = BackgroundScheduler(executors=executors)
          

5 觸發(fā)器 Trigger

指定定時任務(wù)執(zhí)行的時機(jī)。

1) date 在特定的時間日期執(zhí)行

            
from datetime import date
# 在2019年11月6日00:00:00執(zhí)行
sched.add_job(my_job, 'date', run_date=date(2019, 11, 6))
# 在2019年11月6日16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5))
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05')
# 立即執(zhí)行
sched.add_job(my_job, 'date') 
sched.start()
          

2) interval 經(jīng)過指定的時間間隔執(zhí)行

            
weeks (int) ?C number of weeks to wait
days (int) ?C number of days to wait
hours (int) ?C number of hours to wait
minutes (int) ?C number of minutes to wait
seconds (int) ?C number of seconds to wait
start_date (datetime|str) ?C starting point for the interval calculation
end_date (datetime|str) ?C latest possible date/time to trigger on
timezone (datetime.tzinfo|str) ?C time zone to use for the date/time calculations
from datetime import datetime
# 每兩小時執(zhí)行一次
sched.add_job(job_function, 'interval', hours=2)
# 在2012年10月10日09:30:00 到2014年6月15日11:00:00的時間內(nèi),每兩小時執(zhí)行一次
sched.add_job(job_function, 'interval', hours=2, start_date='2012-10-10 09:30:00', end_date='2014-06-15 11:00:00')
          

3) cron 按指定的周期執(zhí)行

            
year (int|str) ?C 4-digit year

month (int|str) ?C month (1-12)

day (int|str) ?C day of the (1-31)

week (int|str) ?C ISO week (1-53)

day_of_week (int|str) ?C number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)

hour (int|str) ?C hour (0-23)

minute (int|str) ?C minute (0-59)

second (int|str) ?C second (0-59)

start_date (datetime|str) ?C earliest possible date/time to trigger on (inclusive)

end_date (datetime|str) ?C latest possible date/time to trigger on (inclusive)

timezone (datetime.tzinfo|str) ?C time zone to use for the date/time calculations (defaults to scheduler timezone)

# 在6、7、8、11、12月的第三個周五的00:00, 01:00, 02:00和03:00 執(zhí)行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

# 在2014年5月30日前的周一到周五的5:30執(zhí)行
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

          


6.任務(wù)存儲

            
MemoryJobStore 默認(rèn)內(nèi)存存儲
MongoDBJobStore 任務(wù)保存到MongoDB
from apscheduler.jobstores.mongodb import MongoDB
JobStoreMongoDBJobStore()復(fù)制代碼
RedisJobStore 任務(wù)保存到redis
from apscheduler.jobstores.redis import RedisJobStore
RedisJobStore()
          

7 配置方法

方法1

            
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

executors = {
 'default': ThreadPoolExecutor(20),
}
conf = { # redis配置
 "host":127.0.0.1,
 "port":6379,
 "db":15, # 連接15號數(shù)據(jù)庫
 "max_connections":10 # redis最大支持300個連接數(shù)
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(jobstore='redis', **conf) # 添加任務(wù)持久化存儲方式,如果未安裝redis可省略此步驟
          

方法2

            
from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
 'default': {'type': 'threadpool', 'max_workers': 20},
 'processpool': ProcessPoolExecutor(max_workers=5)
}
scheduler = BackgroundScheduler()
# .. 此處可以編寫其他代碼
# 使用configure方法進(jìn)行配置
scheduler.configure(executors=executors)
          

8 啟動

scheduler.start()

對于BlockingScheduler ,程序會阻塞在這,防止退出,作為獨立進(jìn)程時使用。(可以用來生成靜態(tài)頁面)

對于BackgroundScheduler,可以在應(yīng)用程序中使用。不再以單獨的進(jìn)程使用。(如30分鐘內(nèi)取消訂單)

9 擴(kuò)展

任務(wù)管理

方式1

            
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任務(wù)
job.remove() # 刪除任務(wù)
job.pause() # 暫定任務(wù)
job.resume() # 恢復(fù)任務(wù)

          

方式2

            
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任務(wù) 
scheduler.remove_job('my_job_id') # 刪除任務(wù)
scheduler.pause_job('my_job_id') # 暫定任務(wù)
scheduler.resume_job('my_job_id') # 恢復(fù)任務(wù)

          

調(diào)整任務(wù)調(diào)度周期

            
job.modify(max_instances=6, name='Alternate name')
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')復(fù)制代碼
停止APScheduler運行
scheduler.shutdown()
          

10 綜合使用

這里提供30分鐘取消訂單支付的思路,可以使用Flask或者Django程序都能實現(xiàn),這里是在django應(yīng)用中動態(tài)的添加一個定時任務(wù),調(diào)度器需要使用BackgroundScheduler。下面先定義執(zhí)行訂單取消的任務(wù)。

            
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BackgroundScheduler
from goods.models import SKU
from orders.models import OrderGoods
def cancel_order_job(order_id, sku_id, stock, sales):
 # 將訂單商品和訂單信息篩選出來
 order_goods = OrderGoods.objects.filter( order_id=order_id, sku_id=sku_id)
 order_goods.delete() # 刪除訂單
 try:
  sku = SKU.objects.get(id=sku_id)
  sku.stock += stock # 訂單刪掉后商品表里的庫存恢復(fù)
  sku.sales -= sales # 商品表里銷量還原
  sku.save()
 except Exception as e:
  print(e)
          


具體操作哪些表要根據(jù)自身表的設(shè)計來定,大致是上面的思路。然后在生成訂單的視圖中同時生成取消訂單的任務(wù)。然后將取消訂單 cancel_order_job() 需要的參數(shù)傳遞過去,注意要判定當(dāng)前訂單的狀態(tài)為未支付狀態(tài)。

            
from datetime import datetime, timedelta

class OrderCommitView(View):

 def post(self, request):
 # ... 此處省略生成訂單相關(guān)邏輯
   if status == OrderInfo.STATUS.UNPADED: # 待支付狀態(tài)
  
    executors = {
    'default': ThreadPoolExecutor(10) 
    }
    now = datetime.now()
    delay = now + timedelta(minutes=30) # 從當(dāng)前下訂單延時30分鐘后
    scheduler = BackgroundScheduler(executors=executors)
    # 添加定時任務(wù)
    scheduler.add_job(cancel_order_job, 'date', run_date=delay,
    args=[order_id, sku.id, sku.stock, sku.sales])
    scheduler.start()
    # ....省略其他業(yè)務(wù)及返回

          


注意: 如果需要周期性的執(zhí)行一個定時任務(wù),如果用到了django中模型類或者Flask的配置信息等相關(guān)信息,需要將框架的配置信息導(dǎo)入。

總結(jié)

以上所述是小編給大家介紹的Python定時任務(wù)工具之APScheduler詳解,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
如果你覺得本文對你有幫助,歡迎轉(zhuǎn)載,煩請注明出處,謝謝!


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 97中文字幕在线 | 无码免费一区二区三区免费播放 | 久久不射网 | 理论片我不卡在线观看 | 国产高清不卡视频 | 欧美日韩国产三级 | 久久伦理片 | 国产精品嫩草影院奶水 | 日本乱人伦片中文字幕三区 | 久青草青综合在线视频 | 亚洲国产精品a一区二区三区 | 色噜噜亚洲精品中文字幕 | 成人免费观看高清在线毛片 | 色中涩 | 一级一片免费看 | 狠狠色丁香久久婷婷综合丁香 | 黄色免费看网站 | 欧美黄色a| 中文字幕热久久久久久久 | 国产精品亚洲综合 | 亚洲五月综合网色九月色 | 色婷婷久久综合中文网站 | 女bbbbxxxx毛片视频0 | 欧美视频色| 8090碰成年女人免费碰碰尤物 | 我想看一级毛片免费的 | 亚洲一区二区精品视频 | 国产欧美日韩精品第三区 | 九九99re在线视频精品免费 | 国产swag在线观看 | 国产免费久久精品久久久 | 亚洲福利一区二区三区 | 国产精品久久久久免费a∨ 国产精品久久久久免费视频 | 成年人看的黄色 | 国产一级在线观看www色 | 全亚洲最大的免费私人影剧院 | 天天干天天干天天 | 欧美一级片 在线播放 | 成人欧美视频在线观看播放 | 欧美精品亚洲一区二区在线播放 | 91av国产在线|