基于python实现的三方组件----Celery

一.基于python实现的三方组件----Celery

1.作用

用于异步周期任务的处理

2.Celery的组成

(1)任务 app
(2)记录任务的缓存(通常用redis或rabbitMQ)
    任务记录 -broker
    任务返回记录-backend
(3)Worker 员工
    主动执行任务
    主动反馈结果

3.celery简单实例

s1.py

from celery import Celery
import time

#使用redis连接url格式 :redis://:password@hostname:port/db_number
my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中task代表你对任务在内部的称呼

@my_task.task
def my_func1(a,b):
    time.sleep(10)
    return a+b

@my_task.task
def my_func2():
    time.sleep(10)
    return 2

@my_task.task
def my_func3():
    time.sleep(10)
    return 3

命令行运行

Linux:Linux - celery worker -A s1 -l INFO 
Windows:celery worker -A s1 -l INFO -P eventlet
#Windows下需要下载eventlet模块模块,否则celery4的版本不支持windows
#l:日志输出
#c:数量

s2.py

from s1 import my_func1,my_func2,my_func3
pid=my_func1.delay(10,20)
print(pid)
pid=my_func2.delay()
print(pid)
pid=my_func3.delay()
print(pid)

s3.py

from celery.result import AsyncResult
from s1 import my_task
#运行s2.py得到的pid
res=AsyncResult(id='2b36d20f-da07-42fe-b203-1e56fbaafd5e',app=my_task)
if res.successful():
    print(res.get())
else:
    print("任务正在进行中")

4.爬虫简单应用

在caiji.py中

from flask import Flask,request as requ,jsonify,render_template,send_file
import pymongo
import json
import time
import  urllib
import requests
import re
from urllib import request
import uuid
from celery import Celery
import time

#使用redis连接url格式 :redis://:password@hostname:port/db_number
my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中task代表你对任务在内部的称呼

#获取各种分类的歌曲列表
@my_task.task
def getcontent():
        # content=requ.form.get("content")
        # print(content)
        headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36"}
        url="https://www.ximalaya.com/ertong/ertongwenxue"
        request=urllib.request.Request(url,headers=headers)
        response=urllib.request.urlopen(request)

        response_text=response.read().decode("utf-8")
        title_id_list=re.findall('"album-title line-2 lg.+?" title="(.+?)" href="/ertong/(d+?)/">',response_text)

        anthor_list=re.findall('"album-author.+?" title="(.+?)" href',response_text)
        response_list=[]
        i = 0
        for i in range(len(title_id_list)):
            response_dict={}
            response_dict={
                "title":title_id_list[i][0],
                "id":title_id_list[i][1],
                "author":anthor_list[i]
            }
            response_list.append(response_dict)


        # print("返回",response_list)
        return response_list


#获取music的二进制文件
@my_task.task
def getmusic(id):
    print(id)
    url="http://m.ximalaya.com/ertong/"+id+"/"
    response=requests.get(url)
    response.encoding="utf-8"
    path=re.findall('"isCopyright":.+?"src":"(.+?)","albumId"',response.text)[0]
    print("res",path)
    d_data = requests.get(path)
    get_str=str(uuid.uuid4())
    print(get_str)
    name="./music/"+get_str + ".mp3"
    with open(name,"wb") as f:
        f.write(d_data.content)
    return send_file(name)
# getcontent()

在results.py中

from caiji import getcontent,getmusic
res1=getcontent.delay()
print(res1)
for i in res1.get():
    res2 = getmusic.delay(i["id"])
    print(res2)

5.定时任务(十秒钟后执行函数)

在s4.py中

from celery import Celery
import time
my_task=Celery("task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中代表你对任务在内部的称呼
​
@my_task.task
def my_func1(a,b):
    return 1

在s5.py中

import datetime
import time
from s4 import my_func1
​
tp = time.time()
utc_time = datetime.datetime.utcfromtimestamp(tp)
add_time = datetime.timedelta(seconds=10)
utc_time = utc_time + add_time
res = my_func1.apply_async(args=(2,3),eta=utc_time)
print(res)

6.周期任务

task_one.py

from celery import Celery
import time
​
my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") 
@my_task.task
def my_func1():
    time.sleep(10)
    return "十秒钟执行的"

task_two.py

import time
from task_one import my_task
@my_task.task
def my_func2():
    time.sleep(5)
    return "五秒钟执行的"

s6.py

from celery import Celery
from celery.schedules import crontab

celery_task = Celery("task",
                     broker="redis://127.0.0.1:6379",
                     backend="redis://127.0.0.1:6379",
                     include=["task_one","task_two"])

#我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10)
celery_task.conf.beat_schedule={
    "each10s_task":{
        "task":"task_one.my_func1",
        "schedule":10, # 每10秒钟执行一次
        # "args":(10,20)
    },
    "each5s_task": {
        "task": "task_two.my_func2",
        "schedule":5, # 每5秒
        # "args": (50, 60)
    },

}

# celery beat -A Celery_task
# celery worker -A Celery_task -l INFO -P eventlet

celery beat -A Celery_task

7.celery项目目录

在selery.py中

from celery import Celery
my_task = Celery("task",
                 broker="redis://127.0.0.1:6379",
                 backend="redis://127.0.0.1:6379",
                 include=["Celery_task.task_one","Celery_task.task_two"])

在task_one.py中

from Celery_task.celery import my_task

@my_task.task
def func1():
    return 1

@my_task.task
def func3():
    return 3

在task_two.py中

from Celery_task.celery import my_task

@my_task.task
def func2():
    return 2

celery worker -A Celery_task -l INFO -P eventlet

 

 

 

原文地址:https://www.cnblogs.com/shanghongyun/p/10448749.html