django apscheduler 定时任务,轮询执行某个任务(上篇)

这是一个 Django 应用程序,它为 APScheduler 添加了一个轻量级的包装器。它允许使用 Django 的 ORM 在数据库中存储持久作业。

django-apscheduler 是一个很好的选择,可以快速轻松地将基本调度功能添加到您的 Django 应用程序中,并且依赖最少,附加配置很少。理想的用例可能涉及按固定执行计划运行少量任务。

这种简单性的代价是您需要小心确保只有一个调度程序在特定时间点处于活动状态

这种限制源于这样一个事实,即 APScheduler 当前没有任何进程间同步和信令方案 ,可以在作业已添加、修改或从作业存储中删除时通知调度程序。

似乎计划在即将发布的 APScheduler 4.0 版本中支持多个调度程序之间的持久作业存储共享在此之前,一个典型的Django部署在生产中会启动多个工作进程,如果每个工作进程结束了运行其自己的调度那么这可能会导致工作中被遗漏或多次执行,以及重复的条目DjangoJobExecution表正在创建。

所以现在你的选择是:

  1. 使用自定义 Django 管理命令在其自己的专用进程中启动单个调度程序(推荐- 请参见runapscheduler.py下面示例);或者

  2. 实现您自己的远程处理 逻辑,以确保DjangoJobStore所有 Web 服务器的工作进程都可以以协调和同步的方式使用单个进程(对于大多数用例而言,额外的努力和增加的复杂性可能不值得);或者

  3. 选择的替代任务处理库确实使用某种像的Redis,RabbitMQ的,亚马逊SQS或类似共享消息代理的支持进程间通信(参见: https://djangopackages.org/grids/g/workers-queues-tasks /对于流行的选项)。

该软件包的特点包括:

  • 自定义DjangoJobStore:一个APScheduler 作业存储 ,将预定作业保存到 Django 数据库。您可以直接通过 Django 管理界面查看计划作业并监控作业执行情况:

    职位

  • 作业存储还维护当前计划作业的所有作业执行历史,以及状态代码和异常(如果有):

    职位

  • 注意: APScheduler 会 在作业的最后一次计划执行被触发后自动从作业存储中删除作业这也会从数据库中删除相应的作业执行条目(即,作业执行日志仅针对“活动”作业进行维护。)

  • 也可以通过DjangoJob管理页面手动触发作业执行

    职位

  • 注意:为了防止长时间运行的作业导致 Django HTTP 请求超时,通过 Django 管理站点启动的所有 APScheduler 作业的组合最大运行时间为 25 秒。这个超时值可以通过APSCHEDULER_RUN_NOW_TIMEOUT设置来配置

  • 安装

    pip  install  django - apscheduler
     

    快速开始

    • 像这样添加django_apscheduler到您的INSTALLED_APPS设置中:
    INSTALLED_APPS  = (
         # ... 
        "django_apscheduler" ,
    )
    • django-apscheduler 带有开箱即用的合理配置默认值。可以通过将以下设置添加到 Djangosettings.py文件来覆盖默认值
    • # Format string for displaying run time timestamps in the Django admin site. The default
      # just adds seconds to the standard Django format, which is useful for displaying the timestamps
      # for jobs that are scheduled to run on intervals of less than one minute.
      # 
      # See https://docs.djangoproject.com/en/dev/ref/settings/#datetime-format for format string
      # syntax details.
      APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a"
      
      # Maximum run time allowed for jobs that are triggered manually via the Django admin site, which
      # prevents admin site HTTP requests from timing out.
      # 
      # Longer running jobs should probably be handed over to a background task processing library
      # that supports multiple background worker processes instead (e.g. Dramatiq, Celery, Django-RQ,
      # etc. See: https://djangopackages.org/grids/g/workers-queues-tasks/ for popular options).
      APSCHEDULER_RUN_NOW_TIMEOUT = 25  # Seconds
      

        

      # runapscheduler.py
      import logging
      
      from django.conf import settings
      
      from apscheduler.schedulers.blocking import BlockingScheduler
      from apscheduler.triggers.cron import CronTrigger
      from django.core.management.base import BaseCommand
      from django_apscheduler.jobstores import DjangoJobStore
      from django_apscheduler.models import DjangoJobExecution
      
      
      logger = logging.getLogger(__name__)
      
      
      def my_job():
          #  Your job processing logic here... 
          pass
      
      def delete_old_job_executions(max_age=604_800):
          """This job deletes all apscheduler job executions older than `max_age` from the database."""
          DjangoJobExecution.objects.delete_old_job_executions(max_age)
      
      
      class Command(BaseCommand):
          help = "Runs apscheduler."
      
          def handle(self, *args, **options):
              scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
              scheduler.add_jobstore(DjangoJobStore(), "default")
              
              scheduler.add_job(
                  my_job,
                  trigger=CronTrigger(second="*/10"),  # Every 10 seconds
                  id="my_job",  # The `id` assigned to each job MUST be unique
                  max_instances=1,
                  replace_existing=True,
              )
              logger.info("Added job 'my_job'.")
      
              scheduler.add_job(
                  delete_old_job_executions,
                  trigger=CronTrigger(
                      day_of_week="mon", hour="00", minute="00"
                  ),  # Midnight on Monday, before start of the next work week.
                  id="delete_old_job_executions",
                  max_instances=1,
                  replace_existing=True,
              )
              logger.info(
                  "Added weekly job: 'delete_old_job_executions'."
              )
      
              try:
                  logger.info("Starting scheduler...")
                  scheduler.start()
              except KeyboardInterrupt:
                  logger.info("Stopping scheduler...")
                  scheduler.shutdown()
                  logger.info("Scheduler shut down successfully!")
      

        

      • ./manage.py runapscheduler每当启动为 Django 应用程序提供服务的 Web 服务器时,都应调用此管理命令如何以及在何处完成此操作的详细信息取决于具体实现,并取决于您使用的 Web 服务器以及您如何将应用程序部署到生产环境。对于大多数人来说,这应该涉及配置各种主管进程。

      • 像往常一样注册任何 APScheduler 作业。请注意,如果您尚未设置DjangoJobStore'default' 作业存储,则需要将其包含jobstore='djangojobstore'在您的scheduler.add_job()调用中。

      高级用法

      django-apscheduler 假设您已经熟悉 APScheduler 及其正确使用。如果没有,请转到项目页面并查看APScheduler 文档

      根据您的环境和用例,可以使用不同类型的调度程序如果您更喜欢运行 aBackgroundScheduler而不是使用 a BlockingScheduler,那么您应该知道将 APScheduler 与 uWSGI 一起使用需要一些额外的 配置步骤才能重新启用线程支持。

      支持的数据库

      请注意Django 官方支持的数据库列表django-apscheduler 可能不适用于不受支持的数据库,如 Microsoft SQL Server、MongoDB 等。

      数据库连接和超时

      django-apscheduler 依赖于标准的 Django 数据库配置设置这些设置与您的数据库的配置方式相结合,决定了您的特定部署将如何进行连接管理。

      如果您遇到任何类型的“丢失连接”错误,则可能意味着:

      • 您的数据库连接已超时。可能是时候开始考虑部署连接池程序(如 pgbouncer)来为您管理数据库连接。
      • 您的数据库服务器已崩溃/已重新启动。Django不会自动重新连接
原文地址:https://www.cnblogs.com/SunshineKimi/p/14879445.html