django-celery异步发送邮箱

celery介绍

将redis发布订阅模式用做消息队列和rabbitmq的区别:

可靠性

redis:没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,这条消息将丢失, 不会存在内存中;rabbbitmq: 具有消息消费确认机制,如果发布一条消息,还没有消费者消费该队列,那么这条消息将一直存放在队列中,直到有消费者消费了该条消息,以此可以保证消息的可靠消费。

实时性

redis实时性高,redis作为高效的缓存服务器,所有数据都存在在服务器中,所以它具有更高的实时性消费者负载均衡;rabbitmq队列可以被多个消费者同时监控消费,但是每一条消息只能被消费一次,由于rabbitmq的消费确认机制,因此它能够根据消费者的消费能力而调整它的负载,redis发布订阅模式,一个队列 可以被多个消费者同时订阅,当有消息到达时,会将该消息依次发送给每个订阅者;

持久性

redis:redis的持久化是针对 于整个redis缓存的内容,它有RDB和AOF两个持久化方式,可以将整个redis实例持久化到磁盘,以此来做数据备份,防止异常情况下导致数据丢失。rabbitmq:队列、消息都可以选择性持久化,持久化粒度更小,更灵活;队列监控rabbitmq实现了后台监控平台,可以在该平台上看到所有创建的队列的详细情况,良好的台后管理平台可以方便我们更好的使用;redis没有所谓的监控平台。

总结:

redis:轻量级、低延迟,高并发、低可靠性;

rabbitmq:重量级、高可靠,异步,不保证实时

rabbitmq是一个专门的AMQP协议队列, 它的优势就在于提供可靠的队列服务,并且可做到异步,而redis主要是用于缓存的,redis的发布订阅模块,可用于实现及时性,且可靠性低的功能

名词

task(任务): 就是一个Python函数

queue(队列):将需要执行的任务加入到队列中

worker(工人):在一个新进程中,负责执行队列中的任务

borker(代理人):负责调度,在布置环境中使用redis

使用

安装包

  • pip install celery==3.1.25
  • pip install celery-with-redis==3.0
  • pip install django-celery==3.3.0
  • pip install django-redis==4.10.0
  • pip install redis==2.10.6
  • pip install itsdangerous == 1.1.0

在settings中配置

# INSTALLED_APPS中安装应用
ISNTALLED_APPS = [
    'djcelery',
]

# 配置邮件发送
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果为163邮箱,设置为smtp.163.com
EMAIL_PORT = 465  # 或者 465/587是设置了 SSL 加密方式
# 发送邮件的邮箱
EMAIL_HOST_USER = '373576175@qq.com'
# 在邮箱中设置的客户端授权密码
EMAIL_HOST_PASSWORD = 'mfyyemfcydwtcabg'  # 第三方登陆使用的授权密码
EMAIL_USE_TLS = True  # 这里必须是 True,否则发送不成功
# 收件人看到的发件人, 必须是一直且有效的
EMAIL_FROM = '海上明月<370686999@qq.com>'
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER

在项目下,创建celery_tasks文件夹,在里面创建文件,init.py, celery.py,celeryconfig.py以及tasks.py文件

celery.py下写入的内容

from celery import Celery
from celery_tasks import celeryconfig # 导入celery配置文件

import os
# 为celery设置环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mydemo.settings")

## 创建celery app
app = Celery('celery_tasks')

# 从单独的配置模块中加载配置
app.config_from_object(celeryconfig)

# 设置app自动加载任务
app.autodiscover_tasks(['celery_tasks'])

celeryconfig.py内写入的内容

# 设置结果存储
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/9'

# 设置代理人broker
BROKER_URL = 'redis://127.0.0.1:6379/8'

tasks.py写入的内容

from celery_tasks.celery import app as celery_app  # 导入创建好的celery应用
from django.core.mail import send_mail  # 使用django内置函数发送邮件
from django.conf import settings  # 导入django的配置


@celery_app.task
def send_mail_task(email, token):
    # 使用django内置函数发送邮件
    subject = "海上明月"
    message = ""
    sender = settings.EMAIL_FROM
    recipient = [email]
    html_message = "<h1>{}恭喜您成为美多商城会员,请点击以下链接进行激活邮箱:<a href='http://127.0.0.1:8000/mdadmin/active/{}'>
    http://127.0.0.1:8000/mdadmin/active/{}</a></h1>".format("海上明月", token, token)
    send_mail(subject, message, sender, recipient, html_message=html_message)

views.py

from django.http import HttpResponse
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from itsdangerous import SignatureExpired
from celery_tasks.tasks import send_mail_task
class SendEmailView(View):
    """
    发送邮件
    """
    def post(self, request):
        print(request.POST)
        email = request.POST.get('email')
        user_info = {"user": 1}
        serialize = Serializer(settings.SECRET_KEY, 1800)
        token = serialize.dumps(user_info).decode()
        send_mail_task.delay(email, token)
        return HttpResponse(json.dumps({'msg': 'OK', 'code': 200}))

激活用户

from itsdangerous import SignatureExpired


class ActiveView(View):
    """
    激活用户
    """
    def get(self, request, token):
        serialize = Serializer(settings.SECRET_KEY, 60)
        try:
            user_info = serialize.loads(token)
            print(user_info)
            user_id = user_info["user"]
            print(user_id)
            user = User.objects.get(pk=user_id)
            user.is_active = True
            user.save()
            return HttpResponse(json.dumps({'msg': '激活成功', 'code': 200}, ensure_ascii=False))
        except SignatureExpired:
            return HttpResponse(json.dumps({'msg': '激活链接失效,请重新发送邮件进行激活', 'code': 404}, ensure_ascii=False))

启动celery服务进行测试

# 启动celery服务进行测试
# 启动django服务
python manage.py runserver
# 启动celery的worker
celery -A celery_tasks worker -l info -P eventlet
原文地址:https://www.cnblogs.com/lichaoya/p/13741175.html