celery 配置手册

1.node2:/celery/djtest/djtest#celery --version
3.1.25 (Cipater)

2. 目的

在开发项目中,经常有一些操作时间比较长(生产环境中超过了nginx的timeout时间),


或者是间隔一段时间就要执行任务。在这种情况下,使用celery就是一个很好的选择。



celery 是一个异步任务队列/基于分布式消息传递的作业队列

celery通过消息(message)进行通信,使用代理(broker)在客户端和工作执行者之间进行交互。

当开始一个任务时,客户端发送消息到队列并由代理将其发往响应的工作执行者处。

准备使用redis作为消息代理(broker),Django数据库作为结果存储(ResultStore)


3.安装
redis:
windows:
https://github.com/MSOpenTech/redis/releases/
linux:
yum install redis-server
PS:需要在cmd中运行,不能再powercmd。很奇怪。
pip install celery
pip install celery-with-redis
pip install django-celery

 python manage.py runserver 0.0.0.0:10501
 
 django-admin startproject djtest

4.django 代码(whthas_home为project,portal为app)



这里的 whthas_home==/celery/djtest/djtest

node2:/celery/djtest/djtest#cat __init__.py
#!/bin/python
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
You have mail in /var/spool/mail/root
node2:/celery/djtest/djtest#

INSTALLED_APPS = [
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'suit',
    'django.contrib.admin',
    'DjangoUeditor',
    'portal',
    'djcelery',
]


增加文件,

node2:/celery/djtest/djtest#cat celery.py
from __future__ import absolute_import

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'whthas_home.settings')

app = Celery('portal')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
	
node2:/celery/djtest#python manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, contenttypes, djcelery, sessions
Running migrations:
  Applying djcelery.0001_initial... OK
node2:/celery/djtest#


mysql> show tables;
+----------------------------+
| Tables_in_djtest           |
+----------------------------+
| auth_group                 |
| auth_group_permissions     |
| auth_permission            |
| auth_user                  |
| auth_user_groups           |
| auth_user_user_permissions |
| celery_taskmeta            |
| celery_tasksetmeta         |
| django_admin_log           |
| django_content_type        |
| django_migrations          |
| django_session             |
| djcelery_crontabschedule   |
| djcelery_intervalschedule  |
| djcelery_periodictask      |
| djcelery_periodictasks     |
| djcelery_taskstate         |
| djcelery_workerstate 


node2:/celery/djtest#./manage.py startapp portal
node2:/celery/djtest#ls
db.sqlite3  djtest  manage.py  portal

注意!!!!!!!!
一定要先创建应用,在编辑node2:/celery/djtest#vim djtest/settings.py

添加portal应用


增加文件,portal/tasks.py:

from celery import task
from time import sleep


@task()
def Task_A(message):
    Task_A.update_state(state='PROGRESS', meta={'progress': 0})
    sleep(10)
    Task_A.update_state(state='PROGRESS', meta={'progress': 30})
    sleep(10)
    return message


def get_task_status(task_id):
    task = Task_A.AsyncResult(task_id)

    status = task.state
    progress = 0

    if status == u'SUCCESS':
        progress = 100
    elif status == u'FAILURE':
        progress = 0
    elif status == 'PROGRESS':
        progress = task.info['progress']
    
    return {'status': status, 'progress': progress}
	
3.测试:

node2:/celery/djtest#python manage.py celeryd -l info
/usr/local/python27/lib/python2.7/site-packages/celery/platforms.py:812: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@node2 v3.1.25 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-2.6.32-431.el6.x86_64-x86_64-with-centos-6.5-Final
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         portal:0x1a90490
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . djtest.celery.debug_task
  . portal.tasks.Task_A

[2017-12-05 20:27:29,837: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2017-12-05 20:27:29,848: INFO/MainProcess] mingle: searching for neighbors
[2017-12-05 20:27:30,857: INFO/MainProcess] mingle: all alone
[2017-12-05 20:27:30,866: WARNING/MainProcess] /usr/local/python27/lib/python2.7/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2017-12-05 20:27:30,866: WARNING/MainProcess] celery@node2 ready.


测试:

node2:/celery/djtest#python manage.py shell
Python 2.7.3 (default, Mar 30 2017, 20:15:12) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-17)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from portal.tasks import *
>>> t=Task_A.delay("heel2")


[2017-12-05 20:28:51,801: INFO/MainProcess] Received task: portal.tasks.Task_A[d6d3d300-1b5f-481a-be42-d21c1af99fd6]
[2017-12-05 20:29:11,828: INFO/MainProcess] Task portal.tasks.Task_A[d6d3d300-1b5f-481a-be42-d21c1af99fd6] succeeded in 20.024760922s: u'heel2'


4.django后台定义任务







5、执行任务
启动broker:python manage.py celeryd -l info

因为这里是个定时任务,所以还需要启动心跳 :python manage.py celery beat  
broker侧能看到:

原文地址:https://www.cnblogs.com/hzcya1995/p/13349388.html