APScheduler (advanceded python scheduler)是一款Python開發(fā)的定時任務(wù)工具。
文檔地址?apscheduler.readthedocs.io/en/latest/u…
特點:
- 不依賴于Linux系統(tǒng)的crontab系統(tǒng)定時,獨立運行
- 可以?動態(tài)添加 新的定時任務(wù),如下單后30分鐘內(nèi)必須支付,否則取消訂單,就可以借助此工具(每下一單就要添加此訂單的定時任務(wù))
- 對添加的定時任務(wù)可以做持久保存
1 安裝
pip install apscheduler
2 組成
- APScheduler 由以下四部分組成:
- triggers 觸發(fā)器 指定定時任務(wù)執(zhí)行的時機(jī)
- job stores 存儲器 可以將定時持久存儲
- executors 執(zhí)行器 在定時任務(wù)該執(zhí)行時,以進(jìn)程或線程方式執(zhí)行任務(wù)
- schedulers 調(diào)度器 常用的有BackgroundScheduler(?后臺運行 )和BlockingScheduler(?阻塞式 )
3 使用方式
from apscheduler.schedulers.background import BlockingScheduler # 創(chuàng)建定時任務(wù)的調(diào)度器對象 scheduler = BlockingScheduler() # 創(chuàng)建執(zhí)行器 executors = { 'default': ThreadPoolExecutor(20), } # 定義定時任務(wù) def my_job(param1, param2): # 參數(shù)通過add_job()args傳遞傳遞過來 print(param1) # 100 print(param2) # python # 向調(diào)度器中添加定時任務(wù) scheduler.add_job(my_job, 'date', args=[100, 'python'], executors=executors) # 啟動定時任務(wù)調(diào)度器工作 scheduler.start()
4 調(diào)度器 Scheduler
負(fù)責(zé)管理定時任務(wù)
BlockingScheduler : 作為獨立進(jìn)程時使用
from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() scheduler.start() # 此處程序會發(fā)生阻塞 BackgroundScheduler : 在框架程序(如Django、Flask)中使用. from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() scheduler.start() # 此處程序不會發(fā)生阻塞
- AsyncIOScheduler : 當(dāng)你的程序使用了asyncio的時候使用。
- GeventScheduler : 當(dāng)你的程序使用了gevent的時候使用。
- TornadoScheduler : 當(dāng)你的程序基于Tornado的時候使用。
- TwistedScheduler : 當(dāng)你的程序使用了Twisted的時候使用
- QtScheduler : 如果你的應(yīng)用是一個Qt應(yīng)用的時候可以使用。
4 執(zhí)行器 executors
在定時任務(wù)該執(zhí)行時,以進(jìn)程或線程方式執(zhí)行任務(wù)
ThreadPoolExecutor from apscheduler.executors.pool import ThreadPoolExecutor ThreadPoolExecutor(max_workers)
使用方法
from apscheduler.executors.pool import ThreadPoolExecutor executors = { 'default': ThreadPoolExecutor(20) # 最多20個線程同時執(zhí)行 } scheduler = BackgroundScheduler(executors=executors) ProcessPoolExecutor from apscheduler.executors.pool import ProcessPoolExecutor ProcessPoolExecutor(max_workers)
使用方法
from apscheduler.executors.pool import ProcessPoolExecutor executors = { 'default': ProcessPoolExecutor(5) # 最多5個進(jìn)程同時執(zhí)行 } scheduler = BackgroundScheduler(executors=executors)
5 觸發(fā)器 Trigger
指定定時任務(wù)執(zhí)行的時機(jī)。
1) date 在特定的時間日期執(zhí)行
from datetime import date # 在2019年11月6日00:00:00執(zhí)行 sched.add_job(my_job, 'date', run_date=date(2019, 11, 6)) # 在2019年11月6日16:30:05 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5)) sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05') # 立即執(zhí)行 sched.add_job(my_job, 'date') sched.start()
2) interval 經(jīng)過指定的時間間隔執(zhí)行
weeks (int) ?C number of weeks to wait days (int) ?C number of days to wait hours (int) ?C number of hours to wait minutes (int) ?C number of minutes to wait seconds (int) ?C number of seconds to wait start_date (datetime|str) ?C starting point for the interval calculation end_date (datetime|str) ?C latest possible date/time to trigger on timezone (datetime.tzinfo|str) ?C time zone to use for the date/time calculations from datetime import datetime # 每兩小時執(zhí)行一次 sched.add_job(job_function, 'interval', hours=2) # 在2012年10月10日09:30:00 到2014年6月15日11:00:00的時間內(nèi),每兩小時執(zhí)行一次 sched.add_job(job_function, 'interval', hours=2, start_date='2012-10-10 09:30:00', end_date='2014-06-15 11:00:00')
3) cron 按指定的周期執(zhí)行
year (int|str) ?C 4-digit year month (int|str) ?C month (1-12) day (int|str) ?C day of the (1-31) week (int|str) ?C ISO week (1-53) day_of_week (int|str) ?C number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) ?C hour (0-23) minute (int|str) ?C minute (0-59) second (int|str) ?C second (0-59) start_date (datetime|str) ?C earliest possible date/time to trigger on (inclusive) end_date (datetime|str) ?C latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) ?C time zone to use for the date/time calculations (defaults to scheduler timezone) # 在6、7、8、11、12月的第三個周五的00:00, 01:00, 02:00和03:00 執(zhí)行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 在2014年5月30日前的周一到周五的5:30執(zhí)行 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
6.任務(wù)存儲
MemoryJobStore 默認(rèn)內(nèi)存存儲 MongoDBJobStore 任務(wù)保存到MongoDB from apscheduler.jobstores.mongodb import MongoDB JobStoreMongoDBJobStore()復(fù)制代碼 RedisJobStore 任務(wù)保存到redis from apscheduler.jobstores.redis import RedisJobStore RedisJobStore()
7 配置方法
方法1
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor executors = { 'default': ThreadPoolExecutor(20), } conf = { # redis配置 "host":127.0.0.1, "port":6379, "db":15, # 連接15號數(shù)據(jù)庫 "max_connections":10 # redis最大支持300個連接數(shù) } scheduler = BackgroundScheduler(executors=executors) scheduler.add_jobstore(jobstore='redis', **conf) # 添加任務(wù)持久化存儲方式,如果未安裝redis可省略此步驟
方法2
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } scheduler = BackgroundScheduler() # .. 此處可以編寫其他代碼 # 使用configure方法進(jìn)行配置 scheduler.configure(executors=executors)
8 啟動
scheduler.start()
對于BlockingScheduler ,程序會阻塞在這,防止退出,作為獨立進(jìn)程時使用。(可以用來生成靜態(tài)頁面)
對于BackgroundScheduler,可以在應(yīng)用程序中使用。不再以單獨的進(jìn)程使用。(如30分鐘內(nèi)取消訂單)
9 擴(kuò)展
任務(wù)管理
方式1
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任務(wù) job.remove() # 刪除任務(wù) job.pause() # 暫定任務(wù) job.resume() # 恢復(fù)任務(wù)
方式2
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任務(wù) scheduler.remove_job('my_job_id') # 刪除任務(wù) scheduler.pause_job('my_job_id') # 暫定任務(wù) scheduler.resume_job('my_job_id') # 恢復(fù)任務(wù)
調(diào)整任務(wù)調(diào)度周期
job.modify(max_instances=6, name='Alternate name') scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')復(fù)制代碼 停止APScheduler運行 scheduler.shutdown()
10 綜合使用
這里提供30分鐘取消訂單支付的思路,可以使用Flask或者Django程序都能實現(xiàn),這里是在django應(yīng)用中動態(tài)的添加一個定時任務(wù),調(diào)度器需要使用BackgroundScheduler。下面先定義執(zhí)行訂單取消的任務(wù)。
from apscheduler.executors.pool import ThreadPoolExecutor from datetime import datetime, timedelta from apscheduler.schedulers.blocking import BackgroundScheduler from goods.models import SKU from orders.models import OrderGoods def cancel_order_job(order_id, sku_id, stock, sales): # 將訂單商品和訂單信息篩選出來 order_goods = OrderGoods.objects.filter( order_id=order_id, sku_id=sku_id) order_goods.delete() # 刪除訂單 try: sku = SKU.objects.get(id=sku_id) sku.stock += stock # 訂單刪掉后商品表里的庫存恢復(fù) sku.sales -= sales # 商品表里銷量還原 sku.save() except Exception as e: print(e)
具體操作哪些表要根據(jù)自身表的設(shè)計來定,大致是上面的思路。然后在生成訂單的視圖中同時生成取消訂單的任務(wù)。然后將取消訂單
cancel_order_job()
需要的參數(shù)傳遞過去,注意要判定當(dāng)前訂單的狀態(tài)為未支付狀態(tài)。
from datetime import datetime, timedelta class OrderCommitView(View): def post(self, request): # ... 此處省略生成訂單相關(guān)邏輯 if status == OrderInfo.STATUS.UNPADED: # 待支付狀態(tài) executors = { 'default': ThreadPoolExecutor(10) } now = datetime.now() delay = now + timedelta(minutes=30) # 從當(dāng)前下訂單延時30分鐘后 scheduler = BackgroundScheduler(executors=executors) # 添加定時任務(wù) scheduler.add_job(cancel_order_job, 'date', run_date=delay, args=[order_id, sku.id, sku.stock, sku.sales]) scheduler.start() # ....省略其他業(yè)務(wù)及返回
注意: 如果需要周期性的執(zhí)行一個定時任務(wù),如果用到了django中模型類或者Flask的配置信息等相關(guān)信息,需要將框架的配置信息導(dǎo)入。
總結(jié)
以上所述是小編給大家介紹的Python定時任務(wù)工具之APScheduler詳解,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
如果你覺得本文對你有幫助,歡迎轉(zhuǎn)載,煩請注明出處,謝謝!
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
