flask_apscheduler定时任务组件使用

Flask-APScheduler 是Flask框架的一个扩展库,增加了Flask对apScheduler的支持,可以用作特定于平台的调度程序(如cron守护程序或Windows任务调度程序)的跨平台。

APScheduler有三个可以使用的内置调度系统:

  • Cron式调度(可选的开始/结束时间)
  • 基于区间的执行(偶数间隔运行作业,可选的开始/结束时间)
  • 一次性延迟执行(在设定的日期/时间运行一次作业)

APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:

  • triggers: 任务触发器组件,提供任务触发方式
  • job stores: 任务商店组件,提供任务保存方式
  • executors: 任务调度组件,提供任务调度方式
  • schedulers: 任务调度组件,提供任务工作方式

支持多种存储空间

  • RAM
  • 基于文件的简单数据库
  • SQLAlchem
  • MongoDB
  • Redis

简单使用

appblueprints/task.py

import uuid
import utils
from flask_apscheduler import APScheduler
from flask import Blueprint, jsonify, request



Taskapi = Blueprint("task", __name__, url_prefix="/task")

Scheduler = None
taskdict = {}

def init():
    global Scheduler
    Scheduler = APScheduler()
    return Scheduler


# 暂停任务
# http://127.0.0.1:5000/task/pause?id=2
@Taskapi.route('/pause', methods=['GET'])
def pause_job():
    job_id = request.args.get('id')
    Scheduler.pause_job(str(job_id))
    response = {}
    response["msg"] = "success"
    return jsonify(response)


# 恢复任务
# http://127.0.0.1:5000/task/resume?id=2
@Taskapi.route('/resume', methods=['GET'])
def resume_job():
    job_id = request.args.get('id')
    Scheduler.resume_job(str(job_id))
    response = {}
    response["msg"] = "success"
    return jsonify(response)


# 获取任务
# http://127.0.0.1:5000/task/getjobs
@Taskapi.route('/getjobs', methods=['GET'])
def get_task():
    # jobs = Scheduler.get_jobs()
    # print(str(pickle.dumps(jobs)))
    return jsonify(taskdict)


# 移除任务
@Taskapi.route('/removejob', methods=['GET'])
def remove_job():
    job_id = request.args.get('id')
    Scheduler.remove_job(str(job_id))
    response = {}
    response["msg"] = "success"
    return jsonify(response)



# 添加任务
# http://cab912dac880.ngrok.io/task/addjob?tasktype=interval&minute=10&psm=caijing.charge.union_service&tag=prod&env=product&chat_id=6911623998451269634
@Taskapi.route('/addjob', methods=['GET'])
def add_task():
    global taskdict
    psm = request.args.get('psm', "cmp.ecom.settle")
    tag = request.args.get('tag', "prod")
    env = request.args.get('env', "boe")
    chat_id = request.args.get('chat_id', "6911623998451269634")
    tasktype = request.args.get('tasktype', "interval")
    minute = request.args.get('minute', "10")
    minute = float(minute)
    response = {}
    response["msg"] = "success"
    seconds = minute * 60
    # trigger='cron' 表示是一个定时任务
    # if tasktype == 'corn':
    #     id = str(uuid.uuid4())
    #     response["taskid"] = id
    #     Scheduler.add_job(func=test, id='1', args=(1, 1), trigger='cron', day_of_week='0-6', hour=18, minute=24,
    #                       second=10, replace_existing=True)
    # trigger='interval' 表示是一个循环任务,每隔多久执行一次
    if tasktype == "interval":
        id = str(uuid.uuid4())
        response["taskid"] = id
        response["data"] = [psm,env,tag,str(seconds)+""]
        taskdict[id] = response["data"]
        Scheduler.add_job(func=utils.start, id=id, args=(psm,env,tag,seconds,id,minute,chat_id), trigger='interval', seconds=seconds,
                          replace_existing=True)
    else:
        response["id"] = ""
        response["msg"] = "tasktype 类型不存在"

    return jsonify(response)

main.py

from flask import Flask
from appblueprints import task

# 创建app
app = Flask(__name__)# 注册蓝图
app.register_blueprint(task.Taskapi)

if __name__ == '__main__':
    scheduler = task.init()
    scheduler.init_app(app=app)
    scheduler.start()
    app.run(host="0.0.0.0", port=6000)

triggers: 支持三种任务触发方式

  • date:固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建

    | 参数 | 说明 |
    | :——————————– | :——————- |
    | run_date (datetime 或 str) | 作业的运行日期或时间 |
    | timezone (datetime.tzinfo 或 str) | 指定时区 |

    1
    2
    例如# 在 2019-4-24 00:00:01 时刻运行一次 start_system 方法
    scheduler .add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])
  • interval:时间间隔触发器,每个一定时间间隔执行一次。

    | 参数 | 说明 |
    | —————————- | ———- |
    | weeks (int) | 间隔几周 |
    | days (int) | 间隔几天 |
    | hours (int) | 间隔几小时 |
    | minutes (int) | 间隔几分钟 |
    | seconds (int) | 间隔多少秒 |
    | start_date (datetime 或 str) | 开始日期 |
    | end_date (datetime 或 str) | 结束日期 |

    1
    2
    # 在 2019-4-24 00:00:00 - 2019-4-24 08:00:00 之间, 每隔两小时执行一次 alarm_job 方法
    scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')
  • cron:cron风格的任务触发
参数说明
year (int 或 str) 表示四位数的年份 (2019)
month(int str) 月 (范围1-12)
day(int str) 日 (范围1-31)
week(int str) 周 (范围1-53)
day_of_week (int str) 表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示
hour (int str) 表示取值范围为0-23时
minute (int str) 表示取值范围为0-59分
second (int str) 表示取值范围为0-59秒
start_date (datetime str) 表示开始时间
end_date (datetime str) 表示结束时间
timezone (datetime.tzinfo str) 表示时区取值

(int|str) 表示参数既可以是int类型,也可以是str类型
(datetime | str) 表示参数既可以是datetime类型,也可以是str类型

例如:表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5

1
sched.add_job(my_job, 'cron',second = '*/5')

job stores: 支持四种任务存储方式

  • memory:默认配置任务存在内存中
  • mongdb:支持文档数据库存储
  • sqlalchemy:支持关系数据库存储
  • redis:支持键值对数据库存储

schedulers: 调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用

  • BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
  • BackgroundScheduler: 当 不运行其它框架 的时候使用,并使你的任务在 后台运行
  • AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
  • GeventScheduler: 和 gevent 框架配套使用
  • TornadoScheduler: 和 tornado 框架配套使用
  • TwistedScheduler: 和 Twisted 框架配套使用
  • QtScheduler: 开发 qt 应用的时候使用
原文地址:https://www.cnblogs.com/-wenli/p/14737476.html