栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Django+Celery的某个任务撤销

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Django+Celery的某个任务撤销

前提:需要做个对某个任务暂停(撤销)的同时且不影响其他任务继续执行的功能 查找资料得知celery3.0版本后加有revoke 功能

celery的官方文档对revoke的描述

下面开始

Django=3.2.8 (这个版本不重要,重要的是下面的那两个)
django-celery=3.3.1
celery=3.1.26.post2
两个celery版本不搭的话用起来很麻烦
django-celery这个包支持的最大celery好像就是celery3…
他这个django-celery好久不更新了,有个新的叫django-celery-base,有时间再看

还有个重要的地方!!!
还有个重要的地方!!!
还有个重要的地方!!!
忘记了python几之后async成了内置关键词了
而celery版本比较早,所以和这个async关键词冲突了,如果不能升/降级python版本的话
建议修改celery中的这个async关键词(个人推荐这种方法,升/降python版本太麻烦,保不齐还有别的不能用的)

  File "/home/lice/.local/lib/python3.8/site-packages/celery/utils/timer2.py", line 19
    from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
               ^
SyntaxError: invalid syntax
#这里一定要看,甭管他弹出多少行错误,找准这个async这一行给他改了


不止这一处需要修改
不止这一处需要修改
不止这一处需要修改
就多启动项目几次,他报一次改一次,直到改的不报错就好了(挺扯的= =||)

代码功能描述
发布celery任务,同时记录任务id(task.id),在任务还在运行的时候revoke(task.id),进行销毁,就这


模型文件

项目根目录下的celery

我这里用的时rabbitMQ.

redis这样配一下就好了

下面开始任务和视图

from .tasks import *
from Dw_test.celery import app  # 这里是你的根目录(setting的那个目录)
from django.http import HttpResponse

from .models import Tasks
from .tasks import *


def ads(request, ls):
    tasks = test_app.apply_async(args=[ls])
    # 记住这个tasks,把他的ID记下来,后面撤销用
    print(tasks.id)
    Tasks.objects.create(task_id=tasks.id)
    # 数据库记录操作
    return HttpResponse(tasks.id)


def dels(req, ls):
    tasks_all = Tasks.objects.get(id=ls)
    app.control.revoke(tasks_all.task_id, terminate=True)
    # terminate的加上,不加上不会撤销= =,还可以加个signal='SIGKILL',但是我没测出来区别
    tasks_all.is_delete = True
    tasks_all.save()
    # 都是我的数据库记录操作
    return HttpResponse(tasks_all.task_id + " is delete")

views.py

from __future__ import absolute_import

from time import sleep

from billiard.exceptions import Terminated
from celery import shared_task


# 这里推荐使用shared_task,他能把你的任务函数变成全局的共享的,task不能,task还得指定目标celery
@shared_task(throws=(Terminated,))  # 这里这个terminated有用的,下面说
def test_app(x):
    print('-----Start-----')
    for i in range(x):
        sleep(1)
        print(i)
    print('-----Over-----')

tasks.py(得自己在app下手动建)

from django.urls import path

from .views import *

urlpatterns = [
    path('task/', ads),
    path('task_del/', dels),
]

这是app下的urls.py(自己手动建)

顺利的话就开始启动celery和项目了
Celery命令:celery -A 项目名 worker -l info
项目启动命令:python manage.py runserver
celery成功可见↓

下面开始测试

数越大时间越长,前面写的,可以改,返回的这个是任务id

可以看到已成功开始运行了
下面尝试终止这个任务


不出意外的话,会出现报错并成功撤销任务

解决方法就是我上面说的那个任务函数(task.py里的)加个抛出异常

这样

再试一次

成功停止

针对暂停和开启功能,可通过把未处理的数据保存的数据库,开启的时候从数据库处理未完成任务

以上!~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/325751.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号