Integration with FastAPI and APScheduler [with ray]

Integration with FastAPI and APScheduler

https://www.cnblogs.com/lightsong/p/15054120.html

上篇博文,介绍了如何给 API Server添加 APScheduler, 以便之后后台的定时任务。

但是这里有一个问题, 如果执行的定时任务很是耗时, 则会在主进程(API server)占有大量的计算资源, 导致API server响应新的连接不及时。

这里引入 RAY 框架来专门解决这个问题。

Ray

https://github.com/ray-project/ray

本质上说, 此框架是一个支持分布式计算的框架, 并且支持 强化学习, 以及模型调参的工作。

An open source framework that provides a simple, universal API for building distributed applications. Ray is packaged with RLlib, a scalable reinforcement learning library, and Tune, a scalable hyperparameter tuning library.

支持三种模式:

寄生于宿主进程

独立进程

集群

样例代码, 在主进程中, 调用如下代码, 则会将remote标注的函数推送到  ray 工作进程, 此工作进程可以在任何一个主机上。

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

https://zhuanlan.zhihu.com/p/111340572

在 Paper 里面描述了一个典型的远程调用流程:

Ray与Celery相比

celery也是一个分布式计算的框架。

但是celery部署work进程时候, 需要制定  task所在脚本,

这样工作进程的环境部署,是跟要执行的脚本强相关的。

但是Ray,更加类似Jenkins的主从模式, 可以将待执行的脚本推送到worker node上,然后执行,

这在应用部署上更加解耦, ray相当于是一个分布式运行环境, 可以提交任何的脚本到平台上执行。

类似 spark 平台。

https://github.com/fanqingsong/distributed_computing_on_celery/blob/master/tasks.py

# tasks.py
import time
from celery import Celery

celery = Celery('tasks', broker='pyamqp://localhost:5672')

@celery.task
def sendmail(mail):
    print('sending mail to %s...' % mail['to'])
    time.sleep(2.0)
    print('mail sent.')

https://github.com/fanqingsong/distributed_computing_on_celery/blob/master/taskscaller.py

# tasks caller

from tasks import sendmail

sendmail.delay(dict(to='celery@python.org'))

print("call done")

run

#run tasks proccess
pipenv run celery -A tasks worker --loglevel=info -P eventlet

# run producer
pipenv run python taskscaller.py

Ray Cluster Overview

https://docs.ray.io/en/master/cluster/index.html

What is a Ray cluster?

One of Ray’s strengths is the ability to leverage multiple machines in the same program. Ray can, of course, be run on a single machine (and is done so often), but the real power is using Ray on a cluster of machines.

A Ray cluster consists of a head node and a set of worker nodes. The head node needs to be started first, and the worker nodes are given the address of the head node to form the cluster:

../_images/ray-cluster.jpg

https://docs.ray.io/en/master/configure.html#cluster-resources

# To start a head node.
$ ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>

# To start a non-head node.
$ ray start --address=<address> --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>

# Specifying custom resources
ray start [--head] --num-cpus=<NUM_CPUS> --resources='{"Resource1": 4, "Resource2": 16}'

code refer

# Connect to ray. Notice if connected to existing cluster, you don't specify resources.
ray.init(address=<address>)

also refer to

https://docs.ray.io/en/releases-0.8.5/using-ray-on-a-cluster.html#deploying-ray-on-a-cluster

Autoscaling clusters with Ray

https://medium.com/distributed-computing-with-ray/autoscaling-clusters-with-ray-36bad4da6b9c

Ray Dashboard

https://docs.ray.io/en/master/ray-dashboard.html#ray-dashboard

提供了完备的后台诊断工具

(1)集群度量

(2)错误和异常,容易定位

(3)查看各个机器上的日志

。。。

Ray’s built-in dashboard provides metrics, charts, and other features that help Ray users to understand Ray clusters and libraries.

The dashboard lets you:

  • View cluster metrics.

  • See errors and exceptions at a glance.

  • View logs across many machines in a single pane.

  • Understand Ray memory utilization and debug memory errors.

  • See per-actor resource usage, executed tasks, logs, and more.

  • Kill actors and profile your Ray jobs.

  • See Tune jobs and trial information.

  • Detect cluster anomalies and debug them.

Logging directory structure

https://docs.ray.io/en/master/ray-logging.html#id1

By default, Ray logs are stored in a /tmp/ray/session_*/logs directory.

worker-[worker_id]-[job_id]-[pid].[out|err]: Python/Java part of Ray drivers and workers. All of stdout and stderr from tasks/actors are streamed here. Note that job_id is an id of the driver.

在代码中添加打印,辅助定位

import ray
# Initiate a driver.
ray.init()

@ray.remote
def task():
    print(f"task_id: {ray.get_runtime_context().task_id}")

ray.get(task.remote())

(pid=47411) task_id: TaskID(a67dc375e60ddd1affffffffffffffffffffffff01000000)

API for log

https://docs.ray.io/en/master/package-ref.html#runtime-context-apis

Runtime Context APIs

ray.runtime_context.get_runtime_context()[source]

Get the runtime context of the current driver/worker.

Example:

>>> ray.get_runtime_context().job_id # Get the job id.
>>> ray.get_runtime_context().get() # Get all the metadata.

PublicAPI (beta): This API is in beta and may change before becoming stable.

还可以查到 node_id, task_id

property job_id

Get current job ID for this worker or driver.

Job ID is the id of your Ray drivers that create tasks or actors.

Returns
If called by a driver, this returns the job ID. If called in

a task, return the job ID of the associated driver.

property node_id

Get current node ID for this worker or driver.

Node ID is the id of a node that your driver, task, or actor runs.

Returns

a node id for this worker or driver.

property task_id

Get current task ID for this worker or driver.

Task ID is the id of a Ray task. This shouldn’t be used in a driver process.

ray.wait() for Pipeline data processing

https://docs.ray.io/en/master/auto_examples/tips-for-first-time.html#tip-4-pipeline-data-processing

import time
import random
import ray

ray.init(num_cpus = 4)

@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
    return x

def process_incremental(sum, result):
    time.sleep(1) # Replace this with some processing code.
    return sum + result

start = time.time()
result_ids = [do_some_work.remote(x) for x in range(4)]
sum = 0
while len(result_ids):
    done_id, result_ids = ray.wait(result_ids)
    sum = process_incremental(sum, ray.get(done_id[0]))
print("duration =", time.time() - start, "
result = ", sum)

AsyncIOScheduler of APScheduler

https://apscheduler.readthedocs.io/en/stable/modules/schedulers/asyncio.html

AsyncIOScheduler was meant to be used with the AsyncIO event loop. By default, it will run jobs in the event loop’s thread pool.

If you have an application that runs on an AsyncIO event loop, you will want to use this scheduler.

异步调度器

https://stackoverflow.com/questions/63001954/python-apscheduler-how-does-asyncioscheduler-work

from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio

async def job():
    print('hi')

scheduler = AsyncIOScheduler()
scheduler.add_job(job, "interval", seconds=3)

scheduler.start()

asyncio.get_event_loop().run_forever()

Code Demo

https://github.com/fanqingsong/fastapi_apscheduler

Purpose

With the help of fastapi and apscheduler, implement API to get cpu rate and set/delete periodical cpu scan job.

reference: https://ahaw021.medium.com/scheduled-jobs-with-fastapi-and-apscheduler-5a4c50580b0e

 

Architecture

Seperate workload from fastapi server, in order to prevent the server from being too busy.

Select APScheduler as time policy manager.

Select Ray as logic node to execute workload.

The call from fastapi or apscheduler to ray cluster is asynchronous, so all the communication is reactive, no blocking status exists.

components

 

Description:

To demostrating how to use fastapi and apscheduler

Requirements: previde API to get CPU rate, and get it periodically

(1) get_cpu_rate -- get current cpu rate by this call

(2) set_cpu_scanner_job -- set one scheduled job to scan cpu rate periodically

(3) del_cpu_scanner_job -- delete the scheduled job

#FastAPI and Pydantic Related Libraries
from fastapi import FastAPI
from pydantic import BaseModel,Field
from typing import List
import asyncio

#APScheduler Related Libraries
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

import uuid

import logging
import psutil
from datetime import datetime
import os

import ray

import time

ray.init(address="192.168.1.10:6379")

# Global Variables
app = FastAPI(title="APP for demostrating integration with FastAPI and APSCheduler", version="2020.11.1",
              description="An Example of Scheduling CPU scanner info periodically")

Schedule = None

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class CPURateResponse(BaseModel):
    cpu_rate:float=Field(title="CPU Rate", description="The current CPU rate")


class SetCPUScannerJobResponse(BaseModel):
    job_id:str=Field(title="CPU Scanner Job ID", description="CPU Scanner Job ID")


class DelCPUScannerJobResponse(BaseModel):
    job_id:str=Field(title="CPU Scanner Job ID", description="CPU Scanner Job ID")



@app.on_event("startup")
async def load_schedule_or_create_blank():
    """
    Instatialise the Schedule Object as a Global Param and also load existing Schedules from SQLite
    This allows for persistent schedules across server restarts.
    """
    print("#####startup event is called.")

    global Schedule
    try:
        jobstores = {
            'default': SQLAlchemyJobStore(url='sqlite:///../store/jobs.sqlite')
        }
        Schedule = AsyncIOScheduler(jobstores=jobstores)
        Schedule.start()
        # asyncio.get_event_loop().run_forever()
        logger.info("Created Schedule Object")
    except:
        logger.error("Unable to Create Schedule Object")


@app.on_event("shutdown")
async def pickle_schedule():
    """
    An Attempt at Shutting down the schedule to avoid orphan jobs
    """
    print("#####shutdown event is called.")

    global Schedule
    Schedule.shutdown()
    logger.info("Disabled Schedule")


@ray.remote
def get_cpu_rate_on_ray():
    logging.info("get_cpu_rate_on_ray called.")
    print("get_cpu_rate_on_ray called. !!")

    job_id = ray.get_runtime_context().job_id
    print(f"job_id={job_id}")

    # time.sleep(10)

    cpu_rate = psutil.cpu_percent(interval=1)

    logging.info(f"cpu_rate = {cpu_rate}")

    return cpu_rate

async def scan_cpu_rate(job_id):
    logging.info(f'###!!!!!!!!!!!!! Tick! call by apscheduler job {job_id}')

    future = get_cpu_rate_on_ray.remote()

    logging.info(future)

    cpu_rate = ray.get(future)

    logging.info(f"cpu_rate = {cpu_rate}")

@app.post("/get_cpu_rate/", response_model=CPURateResponse, tags=["API"])
def get_cpu_rate():
    future = get_cpu_rate_on_ray.remote()

    logging.info(future)

    cpu_rate = ray.get(future)

    logging.info(f"cpu_rate = {cpu_rate}")

    return {"cpu_rate": cpu_rate}


@app.post("/set_cpu_scanner_job/", response_model=SetCPUScannerJobResponse, tags=["API"])
def set_cpu_scanner_job():
    random_suffix = uuid.uuid1()
    job_id = str(random_suffix)

    cpu_scanner_job = Schedule.add_job(scan_cpu_rate, 'interval', seconds=30, id=job_id, args=[job_id])

    job_id = cpu_scanner_job.id
    logging.info(f"set cpu scanner job, id = {job_id}")

    return {"job_id": job_id}


@app.post("/del_cpu_scanner_job/", response_model=DelCPUScannerJobResponse, tags=["API"])
def del_cpu_scanner_job(job_id:str):

    Schedule.remove_job(job_id)

    logging.info(f"set cpu scanner job, id = {job_id}")

    return {"job_id": job_id}
出处:http://www.cnblogs.com/lightsong/ 本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。
原文地址:https://www.cnblogs.com/lightsong/p/15113834.html