栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Django2.0+celery+django-celery实现异步任务处理

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Django2.0+celery+django-celery实现异步任务处理

Django2.0+celery+django-celery实现异步任务处理 安装包
pip install celery
pip install redis
pip install django-celery

目录如下图:

在Django项目的settings.py文件同级创建celetyconfig.py和celery.py文件
  • celetyconfig.py(celery的配置文件)
# -*- coding:utf-8 -*-
import djcelery

djcelery.setup_loader()

# 设置任务队列
CELERY_QUEUES = {
    'work_queue': {
        'exchange': 'work_queue',
        'exchange_type': 'direct',
        'binding_key': 'work_queue'
    }
}

CELERY_DEFAULT_QUEUE = 'work_queue'

CELERY_importS = (
    'haohan.tasks',
)

# 防止死锁
CELERYD_FORCE_EXECV = True

# 并发数
CELERYD_ConCURRENCY = 4

# 任务重试
CELERY_ACKS_LATE = True

# 每个worker最多执行100个任务,防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 100

# 任务超时时间
CELERYD_TASK_TIME_LIMIT = 20 * 30
  • celery.py
# -*- coding:utf-8 -*-

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTING_MODULE', 'auto_test.settings')
app = Celery('auto_test')

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
  • 在settings文件中增加配置
INSTALLED_APPS = [
    ....
    'djcelery',
]


# Celery,配置redis
from .celeryconfig import *
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
  • 在应用下创建tasks.py文件
# -*- coding:utf-8 -*-
from __future__ import absolute_import
import subprocess
import requests
import os

from framework.log import Logger
from haohan.views.base import plan_execute
from djangoAutoTest.celery import app

logger = Logger(file_name="open_api.log").get_logger()

app_package_name = {"haohan_mate": "com.haohan.chargehomeb", "zeekr_power": "com.geely.hhpower"}


@app.task(name="run_app_auto_test")
def run_app_auto_test(app_path, app_url, app_name):
    logger.info("开始执行异步任务: run_app_auto_test")
    try:
        with open(f"{app_path}.apk", "wb") as f:
            result = requests.get(app_url, stream=True, verify=False)
            f.write(result.content)
        logger.info("下包完成,开始装包")
        subprocess.run(f"adb uninstall {app_package_name[str(app_name).lower()]}")
        subprocess.run(f"adb install {app_path}.apk")
        process_id = str(app_url).split("/")[-1].split(".apk")[0]
        logger.info("开始执行计划")
        plan_execute(plan_id=str(app_name).lower(), sendFeishu=0, process_id=process_id, env="release")
        os.remove(f"{app_path}.apk")
    except Exception as e:
        logger.info(f"执行异步任务失败: {e}")

  • 在django的view中引调用异步任务
import datetime
import json
import os

from django.http import JsonResponse

from apiTest.framework.log import Logger
from appUITest.common import path_config
from haohan import tasks
from haohan.models import *

logger = Logger(file_name="open_api.log").get_logger()


app_package_name = {"haohan_mate": "com.haohan.chargehomeb", "zeekr_power": "com.geely.hhpower"}
base_url = "http://127.0.0.1:8000/haohan/get_auto_test_report/?process_id="


def get_apk(request):
    if request.method == "GET":
        return JsonResponse({"code": 10002, "msg": "请求方式有误"})
    if request.method == "POST":
        app_url = json.loads(request.body.decode()).get("url")
        app_name = json.loads(request.body.decode()).get("app_name")
        app_type = json.loads(request.body.decode()).get("app_type")
        if str(app_type).lower() == "ios":
            return JsonResponse({"code": 10006, "msg": "暂不支持iOS自动化"})
        plan_id = Plan.objects.filter(plan_id=app_name)
        if plan_id.count() == 0:
            return JsonResponse({"code": 10004, "msg": "此项目暂无自动化测试计划"})
        if plan_id[0].exe_flag == "1":
            return JsonResponse({"code": 10005, "msg": "该项目关联的自动化计划正在运行,请稍后重试!"})
        try:
            logger.info(f"获取的app信息: {app_url},{app_name}, {app_type}")
            current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
            new_app_name = app_name + "_" + current_time
            app_path = os.path.join(path_config.app_package_path, new_app_name)
            if app_url and app_name and app_type:
                if str(app_type).lower() == "android" and str(app_name).lower() in app_package_name.keys():
                    plan_id.update(exe_flag=1)
                    process_id = str(app_url).split("/")[-1].split(".apk")[0]
                    logger.info("开始调用异步执行")
                    tasks.run_app_auto_test.delay(app_path, app_url, app_name)
                    return JsonResponse({"code": 10000, "msg": "success", "report_url": f"{base_url}" + f"{process_id}"})
        except Exception as e:
            logger.error(f"执行app自动化失败:{e}")
            plan_id.update(exe_flag=0)
        else:
            return JsonResponse({"code": 10003, "msg": "参数有误"})
            
  1. 将get_apk配置在urls中
path(r'get_apk/', get_apk, name='get_apk'),
  1. 启动celery的worker
python3 manage.py celery -A haohan.tasks worker -l INFO
  1. 访问view的get_apk接口,即可异步执行任务

记录下遇到的问题

1.运行celery报错:ModuleNotFoundError: No module named ‘celery.five‘
解决办法:

celery降版本,大部分可能是windows上出问题,celery4.0以上版本对windows不支持,我用了celery 3.1.25版本

2.运行celery报错:from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
^
SyntaxError: invalid syntax
我使用的解决办法:

将 kombu下的async 文件的文件名改成 asynchronous;然后把引用和这个文件的所有文件的里面的async改为asynchronous;


大概需要修改的地方有以下几个文件,可以运行一次celery,点击报错行跳进去改

3、运行celery报错:AttributeError: ‘str’ object has no attribute 'items’
解决办法:

redis的版本太高了,降低redis的版本
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/339718.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号