Python任务调度模块APScheduler

一、APScheduler 是什么&APScheduler四种组成部分?

APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。

1、调度器(scheduler)

调度器(scheduler)是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。 

对于不同的不场景,可以选择的调度器:

  • BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。

对于BlockingScheduler,程序则会阻塞在start()位置,所以,要运行的代码必须写在start()之前。

  • BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(常用)。
  • AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
  • GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
  • TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
  • TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
  • QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。
# BackgroundScheduler: 调度器在后台线程中运行,不会阻塞当前线程。
from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():
    print "%s: 执行任务"  % time.asctime()
scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
while True:
    pass

2、作业存储(job store)

作业存储(job store)主要用来存储被调度的作业,默认的作业存储是简单地把作业保存在内存中。

jobstore提供对scheduler中job的增删改查接口,根据存储方式的不同,分以下几种:

  • MemoryJobStore:没有序列化,jobs就存在内存里,增删改查也都是在内存中操作
  • SQLAlchemyJobStore:所有sqlalchemy支持的数据库都可以做为backend,增删改查操作转化为对应backend的sql语句
  • MongoDBJobStore:用mongodb作backend
  • RedisJobStore: 用redis作backend
  • RethinkDBJobStore: 用rethinkdb 作backend
  • ZooKeeperJobStore:用ZooKeeper做backend

3、执行器(executor)

执行器(executor)主要处理任务的运行,主要是把定时任务中的可调用对象(function)提交给一个一个线程或者进程来进行。当任务完成时,执行器将会通知调度器。

最常用的 执行器(executor) 有两种:

  • ProcessPoolExecutor(进程池)
  • ThreadPoolExecutor(线程池,max:10)

4、触发器(triggers)

当调度一个任务时,需要为它设置一个触发器(triggers)。触发器决定在什么日期/时间、用什么样的形式来执行执行这个定时任务。

APScheduler 有三种内建的 trigger:

  • date: 日期,指定某个确定的时间点,job仅执行一次。
  • interval: 间隔,指定时间间隔(fixed intervals)周期性执行。
  • cron: 周期,使用cron风格表达式周期性执行,用于(在指定时间内)定期运行job的场景。使用同linux下crontab的方式。
  • calendarinterval: 当您想要在一天中的特定时间以日历为基础的间隔运行任务时使用

一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。

4.1、date 定时调度(任务只会执行一次)

参数如下:

  • run_date (datetime|str) – 作业的运行日期或时间
  • timezone (datetime.tzinfo|str) – 指定时区
import datatime
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
def my_job(text):    print(text)
# 2016-12-12运行一次job_function
sched.add_job(my_job, 'date', run_date=date(2016, 12, 12), args=['text'])

# 2016-12-12 12:00:00运行一次job_function
# args=[]中是传给job_function的参数
sched.add_job(my_job, 'date', run_date=datetime.datetime(2016, 12, 12, 12, 0, 0), args=['text'])
scheduler.start()  # 开启

4.2、interval: 每隔一段时间执行一次

weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

  • weeks (int) – 间隔几周
  • days (int) – 间隔几天
  • hours (int) – 间隔几小时
  • minutes (int) – 间隔几分钟
  • seconds (int) – 间隔多少秒
  • start_date (datetime|str) – 开始日期
  • end_date (datetime|str) – 结束日期
  • timezone (datetime.tzinfo|str) – 时区
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job(text):    print(text)
# <!--每隔2小时执行一次-->
scheduler.add_job(my_job, 'interval', hours=2)

# <!--设置时间范围,在设置的时间范围内每隔2小时执行一次-->
scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00)

# <!--使用装饰器的方式添加定时任务-->
@scheduler.scheduled_job('interval', id='my_job_id', hours=2)
def my_job():
    print("Hello World")
scheduler.start()

4.3、cron: 使用同linux下crontab的方式

(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

除了week和 day_of_week,它们的默认值是 *

  • 例如 day=1, minute=20 ,这就等于
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0

工作将在每个月的第一天以每小时20分钟的时间执行

表达式 参数类型 描述
* 所有 通配符。例: minutes=* 即每分钟触发
*/a 所有 可被a整除的通配符。
a-b 所有 范围a-b触发
a-b/c 所有 范围a-b,且可被c整除时触发
xth y 第几个星期几触发。x为第几个,y为星期几
last x 一个月中,最后个星期几触发
last 一个月最后一天触发
x,y,z 所有 组合表达式,可以组合确定值或上方的表达式
  • year (int|str) – 年,4位数字
  • month (int|str) – 月 (范围1-12)
  • day (int|str) – 日 (范围1-31)
  • week (int|str) – 周 (范围1-53)
  • day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – 时 (范围0-23)
  • minute (int|str) – 分 (范围0-59)
  • second (int|str) – 秒 (范围0-59)
  • start_date (datetime|str) – 最早开始日期(包含)
  • end_date (datetime|str) – 最晚结束时间(包含)
  • timezone (datetime.tzinfo|str) – 指定时区

# 特点时间3点30分开始执行
sched.add_job(my_job, 'cron', hour=3, minute=30)
# 周一到周五特点时间5点30分开始执行
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30')

# 装饰器方式设定任务
@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

二、APSched 使用

安装

  • pip安装
pip install apscheduler
python setup.py install

简单示例:

添加任务并设置触发方式为3s一次

注意: 调度器启动后,就不可以修改配置。

from apscheduler.schedulers.blocking import BlockingScheduler
import time
# 实例化一个调度器
scheduler = BlockingScheduler()
def job1():
    print "%s: 执行任务"  % time.asctime()
# 添加任务并设置触发方式为3s一次
scheduler.add_job(job1, 'interval', seconds=3)
# 开始运行调度器
scheduler.start()
------ 结果 --------
> python first.py
Fri Sep  8 20:41:55 2017: 执行任务
Fri Sep  8 20:41:58 2017: 执行任务
...

任务操作

1. 添加任务

添加任务方法有两种:

  1. 通过调用add_job()
  2. 通过装饰器scheduled_job()
  • 第一种方法是最常用的;第二种方法是最方便的,但缺点就是运行时,不能修改任务。
  • 第一种add_job()方法会返回一个apscheduler.job.Job实例,这样就可以在运行时,修改或删除任务。

在任何时候你都能配置任务。但是如果调度器还没有启动,此时添加任务,那么任务就处于一个暂存的状态。只有当调度器启动时,才会开始计算下次运行时间。

还有一点要注意,如果你的执行器或任务储存器是会序列化任务的,那么这些任务就必须符合:

  • 回调函数必须全局可用
  • 回调函数参数必须也是可以被序列化的

2. 移除任务

如果想从调度器移除一个任务,那么你就要从相应的任务储存器中移除它,这样才算移除了。有两种方式:

  • 调用remove_job(),参数为:任务ID,任务储存器名称
  • 在通过add_job()创建的任务实例上调用remove()方法

第二种方式更方便,但前提必须在创建任务实例时,实例被保存在变量中。对于通过scheduled_job()创建的任务,只能选择第一种方式。

当任务调度结束时(比如,某个任务的触发器不再产生下次运行的时间),任务就会自动移除。

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

同样,通过任务的具体ID:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

3. 暂停和恢复任务

通过任务实例或调度器,就能暂停和恢复任务。如果一个任务被暂停了,那么该任务的下一次运行时间就会被移除。在恢复任务前,运行次数计数也不会被统计。

暂停任务,有以下两个方法:

  • apscheduler.job.Job.pause()
  • apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复任务

  • apscheduler.job.Job.resume()
  • apscheduler.schedulers.base.BaseScheduler.resume_job()

4. 获取任务列表

通过get_jobs()就可以获得一个可修改的任务列表。get_jobs()第二个参数可以指定任务储存器名称,那么就会获得对应任务储存器的任务列表。

print_jobs()可以快速打印格式化的任务列表,包含触发器,下次运行时间等信息。

修改任务

通过apscheduler.job.Job.modify()或modify_job(),你可以修改任务当中除了id的任何属性。

比如:

job.modify(max_instances=6, name='Alternate name')

如果想要重新调度任务(就是改变触发器),你能通过apscheduler.job.Job.reschedule()或reschedule_job()来实现。这些方法会重新创建触发器,并重新计算下次运行时间。

比如:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

5. 运行|关闭调度器

默认情况下,调度器会先把正在执行的任务处理完,再关闭任务储存器和执行器。但是,如果你就直接关闭,你可以添加参数:

scheduler.start()  # 运行
scheduler.shutdown()  # 关闭 默认情况下,等待所有当前任务执行完毕后关闭
scheduler.shutdown(wait=False)  # 强制关闭,不等待

6. 暂停、恢复任务进程

调度器可以暂停正在执行的任务:

scheduler.pause()

恢复任务:

scheduler.resume()

同时,也可以在调度器启动时,默认所有任务设为暂停状态。

scheduler.start(paused=True)

7. 意外情况监控

当程序发生报错,停止执行等情况时,添加监控任务

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR

def my_listener(event):
    if event.exception:
        print '任务出错了!!!!!!'
    else:
        print '任务照常运行...'
scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一定性任务,会出错',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

监控内容,加入log日志

import logging
logging.basicConfig(level=logging.INFO,
        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
         datefmt='%Y-%m-%d %H:%M:%S',
         filename='log1.txt',
         filemode='a')
scheduler._logger = logging

三、一些定时任务脚本示例

1、定时任务运行脚本每日凌晨00:30:30执行

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging

scheduler = BlockingScheduler()   # 后台运行

 # 设置为每日凌晨00:30:30时执行一次调度程序
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
        print "schedule execute"
        sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

if __name__ == '__main__':
    try:
        scheduler.start()
        sys_logging.debug("statistic scheduler start success")
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
        sys_logging.debug("statistic scheduler start-up fail")

2、每天晚上0点 - 早上8点期间,每5秒执行一次任务。

# 每天晚上0点 - 早上8点期间,每5秒执行一次任务。
scheduler.add_job(my_job, 'cron',day_of_week='*',hour = '0-8',second = '*/5')

3、在0、10、20、30、40、50分时执行任务。

# 可以被10整除的时间点执行任务,这个任务就表示在0、10、20、30、40、50分时都会执行任务
scheduler.add_job(my_job, 'cron',day_of_week='*',minute = '*/10')

4、直到2020-05-30,每周从周一到周五的早上5:30都执行一次定时任务

# 直到2020-05-30 00:00:00,每周星期从星期一到星期五的早上5:30都执行一次定时任务
sched.add_job(my_job(),'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2020-05-30')

# 截止到2016-12-30 00:00:00,每周一到周五早上五点半运行job_function
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')

5、在6,7,8,11,12月的第3个周五的1,2,3点执行定时任务

# job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行-->
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

6、每5秒执行该程序一次

#表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
sched.add_job(my_job, 'cron',second = '*/5')
原文地址:https://www.cnblogs.com/hoyun/p/14389911.html