celery执行异步任务和定时任务

celery执行异步任务和定时任务

一、什么是celery

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。主要是执行 异步任务定时任务

二、celery架构

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

1、消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

2、任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

3、任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

三、celery安装和使用

1、安装

pip install celery

2、使用场景

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

3、执行异步任务

(1)创建任务

创建celery_task.py

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密码
backend='redis://:123456@127.0.0.1:6379/1'       # 结果存储
broker='redis://:123456@127.0.0.1:6379/2'       # 消息中间件
cel=celery.Celery('test',backend=backend,broker=broker)

@cel.task
def add(x,y):
    return x+y

(2)添加任务(broker)

添加任务到消息中间件的队列中,但是没有执行任务

result不是函数执行结果,它是个对象

创建add_task.py

from celery_task import add
result = add.delay(4,5)
print(result.id)

(3)执行任务(worker)

创建py文件:run.py,执行任务,或者使用命令执行:celery worker -A celery_task -l info

注:windows下:celery worker -A celery_task -l info -P eventlet

常用命令来执行任务

from celery_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

(4)查看执行结果(result)

创建py文件:result.py,查看任务执行结果

from celery.result import AsyncResult
from celery_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')

4、多任务结构

pro_cel
    ├── celery_task  # celery相关文件夹
    │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
    │   └── tasks1.py    #  所有任务函数
    │	└── tasks2.py    #  所有任务函数
    ├── check_result.py 	# 检查结果
    └── send_task.py    # 添加任务

(1)celery.py --- 配置

from celery import Celery

broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2',
cel = Celery('celery_demo', broker=broker, backend=backend,
             # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
             include=['celery_task.tasks1',
                      'celery_task.tasks2'
                      ])

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

(2)task1.py与task2.py --- 创建任务

tasks1.py

import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任务结果:%s"%res

tasks2.py

import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任务结果:%s"%res

(3)send_task.py --- 添加任务

from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2

# 立即告知celery去执行test_celery任务,并传入一个参数
result = test_celery.delay('第一个的执行')
print(result.id)
result = test_celery2.delay('第二个的执行')
print(result.id)

(4)通过命令执行任务

celery worker -A celery_task -l info -P eventlet

(5)check_result.py --- 查看任务结果

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除,执行完成,结果不会自动删除
    # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
    # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')

四、celery执行定时任务

(1)在指定时间执行任务

执行定时任务的创建任务、执行任务、查看任务结果与执行异步任务相同,不同的是在添加任务时,设定时间。

from celery_app_task import add
from datetime import datetime

# 方式一
# 在指定时间执行该任务
v1 = datetime(2019, 2, 13, 18, 19, 56)
print(v1)
# 当前时间对象,转成utc时间
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
result = add.apply_async(args=[1, 3], eta=v2)
print(result.id)



# 方式二
# 在当前时间的后延10s执行任务
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间,args是任务函数参数,eta指定时间
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

(2)每隔一段时间执行一次

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_task.tasks1',
    'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒执行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 传递参数
        'args': ('test',)
    },
    # 'add-every-12-seconds': {
    	  # 执行tasks1下的test_celery函数
    #     'task': 'celery_task.tasks1.test_celery',
    #     每年4月11号,8点42分执行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     每个月的11号,8点42分执行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11),
    #     每天8点42分执行
    #     'schedule': crontab(minute=42, hour=8),
    #     'args': (16, 16)
    # },
}

启动一个beatcelery beat -A celery_task -l info

启动work执行celery worker -A celery_task -l info -P eventlet

五、Django中使用Celery

1、在项目目录下创建celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

2、在app01目录下创建tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

3、视图函数views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

4、settings.py

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
原文地址:https://www.cnblogs.com/linagcheng/p/10375187.html