APScheduler的简单使用

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import time
import logging

'''
APScheduler提供了七种类型的调度器:
(1)BlockingScheduler : 调度器在当前进程的主线程中执行,会阻塞当前线程
(2)BackgroundScheduler  :  调度器在后台线程中执行, 不会阻塞当前线程
(3)AsyncIOScheduler : 结合AsyncIo 模块(一个异步框架)一起使用

APScheduler触发器有3种:
    cron : 功能最强大
    interval: 周期性的执行
    date: 只执行一次

'''

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log.txt',
                    filemode='a')


def aps_test():
    print(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S'), '你好')


def aps_test_name(name):
    """
    带参定时任务
    :param name:
    :return:
    """
    print(datetime.datetime.now().strftime('%Y-%m-%D %H-%M-%S'), name)


def aps_test_param(number):
    print('aps_test_param invoker....')
    # 如果number == 0 ,这里会报错,报错信息会打印到日志文件log.txt中
    # 虽然这儿会报错,但是定时任务并不会中断
    print(10 / number)


def job_listener(event):
    """
    APScheduler提供的监控功能
    :param event:
    :return:
    """
    if event.exception:
        # print(dir(event))
        print(event.job_id, '任务执行过程出错,发个邮件通知运维人员')
    else:
        # print('定时任务正常执行。。。。。')
        pass


scheduler = BlockingScheduler()
# 添加一个定时任务, 使用cron触发器
aps_test = scheduler.add_job(func=aps_test, trigger='cron', second='*/5')
# print(aps_test)
# 可以用来修改这个定时任务,比如间隔周期啥的
# aps_test.modify()

# 添加一个定时任务, 使用interval触发器
scheduler.add_job(func=aps_test_name, args=('美女',), trigger='interval', seconds=10, id='id1')

# 添加一个定时任务(只会执行1次),使用date触发器
scheduler.add_job(func=aps_test_param, args=(10,), trigger='date', run_date='2020-02-11 20:01:01')

# 添加一个定时任务,用来模拟定时任务中报错的情况
# 每个job默认都有一个job_id, 不过默认的job_id 是一串随机字符串,没有可读性,这儿显示声明一个job_id 以便于定位问题job
scheduler.add_job(func=aps_test_param, args=[0], trigger='interval', seconds=6, id='aps_test_param2')

# 添加监控
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

# 日志
scheduler._logger = logging

scheduler.start()

"""
 以上是使用BlockingScheduler这个调度器。
 如果使用BackgroundScheduler这个调度器需要搞个主线程。

"""

。。。。

原文地址:https://www.cnblogs.com/z-qinfeng/p/12296676.html