栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Python项目实战 3.3:异步消息队列.Celery+redis

Python项目实战 3.3:异步消息队列.Celery+redis

目录

一、生产者消费者设计模式

二、Celery介绍和使用


一、生产者消费者设计模式

问题:

我们的代码是自上而下同步执行的。发送短信是耗时的操作。如果短信被阻塞住,用户响应将会延迟。响应延迟会造成用户界面的倒计时延迟。

解决:

异步发送短信发送短信和响应分开执行,将发送短信从主业务中解耦出来。

那么,如何将发送短信从主业务中解耦出来,这是就用到生产者消费者设计模式介绍了。

 它是最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。

生产者:生产者生成消息,缓存到消息队列中(生产任务);

消费者:消费者读取消息队列中的消息并执行(消费任务、执行任务);

由商城生成发短信的消息(任务),缓存到消息队列中,消费者读取消息队列中的发短信消息(任务)并执行。

 

轻量级的我们用celery+redis可满足,重量级的用celery+RabbitMQ

二、Celery介绍和使用

实际开发中,我们可以借助成熟的工具Celery来完成。有了Celery,我们在使用生产者消费者模式时,只需要关注任务本身,极大的简化了程序员的开发流程。

2.1 Celery介绍

一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。单个 Celery 进程每分钟可处理数以百万计的任务。通过消息进行通信,使用消息队列(broker)在客户端和消费者之间进行协调。

安装Celery:

$ pip install -U Celery

Celery官方文档

2.2 创建Celery实例并加载配置

1. 定义Celery包

2. 创建Celery实例

(由于短信平台认证比较麻烦,我们用email做例子)

celery_tasks.main.py

# Celery的入口
from celery import Celery


# 创建Celery实例
celery_app = Celery('wangye_mall')

# 加载配置
celery_app.config_from_object('celery_tasks.config')

# 注册任务
celery_app.autodiscover_tasks(['celery_tasks.email'])

celery_tasks.config.py

# Celery配置文件

# 指定中间人、消息队列、任务队列、容器,使用redis
broker_url = "redis://:123456@192.168.2.105/10"

redis如果没有设置密码就去掉密码 :123456

后面10是10号库,可自定义哪个库都可以

2.3 定义邮件任务,tasks.py是固定命名

tasks.py 

# 定义任务
from celery_tasks.main import celery_app

from .email import Email


# 使用装饰器装饰异步任务,保证celery识别任务
@celery_app.task(name='send_email_code')
def send_email_code(recipient, email_code):
    """
    发送邮件验证码的异步任务
    :param recipient: 收件人邮箱地址
    :param email_code: 验证码
    :return: 成功:0 、 失败:-1
    """

    email = Email()
    send_ret = email.send_mail(recipient, email_code)
    return send_ret

关于发邮件:https://blog.csdn.net/mengnf/article/details/122529911

email.py

import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr

from django import http
# from response_code import RETCODE


class Email(object):
    """发送email的单例类"""

    def __new__(cls, *args, **kwargs):
        # 判断是否存在类属性_instance,_instance是类的唯一对象,即单例
        if not hasattr(Email, "_instance"):
            cls._instance = super(Email, cls).__new__(cls, *args, **kwargs)
            cls._instance.server = smtplib.SMTP_SSL("smtp.qq.com", 465)  # 发件人邮箱中的SMTP服务器,端口是25
            cls._instance.server.login('123456789@qq.com', 'aaaaaaaaaaaaa')  # 发件人账号、密码
        return cls._instance

    def send_mail(cls, to_user, v_code):
        ret = True
        try:
            msg = MIMEText(f'验证码为:{v_code}', 'plain', 'utf-8')
            msg['From'] = formataddr(["望野科技", '123456789@qq.com'])  # 发件人昵称、账号
            msg['To'] = formataddr(["昵称", to_user])  # 收件人昵称、账号
            msg['Subject'] = "验证码"  # 邮件的主题,标题

            cls._instance.server.sendmail('123456789@qq.com', [to_user, ],
                                          msg.as_string())  # 发件人账号、收件人账号、发送内容
            cls._instance.server.quit()  # 关闭连接
        except Exception as e:
            ret = False

        # 响应结果
        if ret:
            return http.JsonResponse({'code': 0, 'errmsg': '邮件发送成功'})
        else:
            return http.JsonResponse({'code': '500', 'errmsg': '邮件发送失败'})

2.4 启动Celery服务

cd到项目目录下: 

celery -A celery_tasks.main worker -l info

-A指对应的应用程序, 其参数是项目中 Celery实例的位置。worker指这里要启动的worker。-l指日志等级,比如info等级。

2.5 调用发送短信任务

发送验证码的地方(我这里是verificationsviews.py):

from celery_tasks.email.tasks import send_email_code

# 发送短信验证码
# CCP().send_template_sms(参数)

# Celery异步发送短信验证码
send_email_code.delay(收件人, 验证码)

2.6 补充celery worker的工作模式

默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。如何自己指定进程数:celery worker -A proj --concurrency=4如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000

安装eventlet模块

pip install eventlet

启用 Eventlet 池

celery -A celery_tasks.main worker -l info -P eventlet -c 1000

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710151.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号