Apscheduler详解(转)

转载:https://blog.csdn.net/somezz/article/details/83104368/

一、内存存储器

  1   #coding:utf-8
  2   from apscheduler.schedulers.blocking import BlockingScheduler
  3   import datetime
  4   from apscheduler.jobstores.memory import MemoryJobStore
  5   from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
  6
  7   def my_job(id='my_job'):
  8       print (id,'-->',datetime.datetime.now())
  9   jobstores = {
 10       'default': MemoryJobStore()
 11
 12   }
 13   executors = {
 14       'default': ThreadPoolExecutor(20),
 15       'processpool': ProcessPoolExecutor(10)
 16   }
 17   job_defaults = {
 18       'coalesce': False,
 19       'max_instances': 3
 20   }
 21   scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
 22   scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
 23   scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\
 24                     end_date='2018-05-30')
 25   scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
 26  scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
 27   try:
 28       scheduler.start()
 29   except SystemExit:
 30       print('exit')
 31       exit()

二、数据库存储器

说明作业被添加到数据库中,程序中断后重新运行时会自动从数据库读取作业信息,而不需要重新再添加到调度器中,如果不注释 21-25 行添加作业的代码,则作业会重新添加到数据库中,这样就有了两个同样的作业,避免出现这种情况可以在 add_job 的参数中增加 replace_existing=True,

  1   #coding:utf-8
  2   from apscheduler.schedulers.blocking import BlockingScheduler
  3   import datetime
  4   from apscheduler.jobstores.memory import MemoryJobStore
  5   from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
  6   from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  7   def my_job(id='my_job'):
  8       print (id,'-->',datetime.datetime.now())
  9   jobstores = {
 10       'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
 11   }
 12   executors = {
 13       'default': ThreadPoolExecutor(20),
 14       'processpool': ProcessPoolExecutor(10)
 15   }
 16   job_defaults = {
 17       'coalesce': False,
 18       'max_instances': 3
 19   }
 20  scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
 21  scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
 22   scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\
 23                     end_date='2018-05-30')
 24   scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
 25   scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
 26   try:
 27       scheduler.start()
 28   except SystemExit:
 29       print('exit')
 30       exit()  

三、schedule常用方法

  1   scheduler.remove_job(job_id,jobstore=None)#删除作业
  2   scheduler.remove_all_jobs(jobstore=None)#删除所有作业
  3   scheduler.pause_job(job_id,jobstore=None)#暂停作业
  4   scheduler.resume_job(job_id,jobstore=None)#恢复作业
  5   scheduler.modify_job(job_id, jobstore=None, **changes)#修改单个作业属性信息
  6   scheduler.reschedule_job(job_id, jobstore=None, trigger=None,**trigger_args)#修改单个作业的触发器并更新下次运行时间
  7   scheduler.print_jobs(jobstore=None, out=sys.stdout)#输出作业信息

四、事件监听

  1   # coding:utf-8
  2   from apscheduler.schedulers.blocking import BlockingScheduler
  3   from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
  4   import datetime
  5   import logging
  6
  7   logging.basicConfig(level=logging.INFO,
  8                       format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
  9                       datefmt='%Y-%m-%d %H:%M:%S',
 10                       filename='log1.txt',
 11                       filemode='a')
 12
 13
 14   def aps_test(x):
 15       print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
 16
 17
 18   def date_test(x):
 19       print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
 20       print (1/0)
 21
 22
 23   def my_listener(event):
 24       if event.exception:
 25           print ('任务出错了!!!!!!')
 26       else:
 27           print ('任务照常运行...')
 28
 29   scheduler = BlockingScheduler()
 30   scheduler.add_job(func=date_test, args=('一次性任务,会出错',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
 31   scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')
 32   scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
 33   scheduler._logger = logging
 34
 35   scheduler.start()  

在生产环境中,可以把出错信息换成发送一封邮件或者发送一个短信,这样定时任务出错就可以立马就知道

原文地址:https://www.cnblogs.com/wangbin2188/p/15665498.html