栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Celery

Celery

1.基础 1.1 定义

Celery 是一个处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度

1.2 组成

Celery 的架构由三部分组成:

组成部分含义
消息中间件Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成.包括,RabbitMQ, Redis 等等
任务执行单元Worker是Celery 提供的任务执行的单元,worker 并发的运行在分布式的系统节点中
任务结果存储用来存储 Worker 执行的任务的结果,Celery支持以不同方式存储任务的结果,包括 AMQP,redis

另外,Celery 还支持不同的并发和序列化的手段

并发:Prefork, Eventlet, gevent, threads/single threaded序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等 1.3 使用场景

celery 是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行.我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

异步任务:将耗时操作任务提交给 Celery 去异步执行,比如发送短信 / 邮件,消息推送,音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

1.4 优点
特点含义
简单Celery 使用和维护都非常简单,并且不需要配置文件
高可用woker和client会在网络连接丢失或者失败时,自动进行重试.并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用
快速单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟使(用 RabbitMQ,librabbitmq,和优化设置时)
灵活Celery几乎每个部分都可以扩展使用,自定义池实现,序列化,压缩方案,日志记录,调度器,消费者,生产者,broker传输等等
2.异步任务 2.1 安装
pip install Celery
2.2 基本使用

celery_task

#!/usr/bin/env python
import celery
import time
backend='redis://127.0.0.1:6379/1'
broker='redis://127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)

@cel.task
def send_email(name):
    print("向%s发送邮件..."%name)
    time.sleep(5)
    print("向%s发送邮件完成"%name)
    return "ok"  

produce_task.py:

#!/usr/bin/env python
from celery_task import send_email, cel
from celery.result import AsyncResult

result = send_email.delay("ja1")
print(result.id)


async_result=AsyncResult(id=result.id, app=cel)
if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

启动命令:

celery  -A celery_task worker -l info -P eventlet
2.3 多任务结构

celery_task/celery.py:

#!/usr/bin/env python
from celery import Celery

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
include_list = ['celery_tasks.task01', 'celery_tasks.task02']
cel = Celery('celery_demo', broker=broker, backend=backend,
             include=include_list)
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

celery_task/task01.py

import time
from celery_tasks.celery import cel

@cel.task
def send_email(res):
    time.sleep(5)
    return "完成向%s发送邮件任务"%res

celery_task/task02.py

import time
from celery_tasks.celery import cel
@cel.task
def send_msg(name):
    time.sleep(5)
    return "完成向%s发送短信任务"%name

celery_task/…/produce_task.py:

#!/usr/bin/env python
from celery_task import send_email, cel
from celery.result import AsyncResult

result = send_email.delay("ja1")
print(result.id)


async_result=AsyncResult(id=result.id, app=cel)
if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

启动命令

celery worker -A celery_task -l info -P eventlet -c 1000
3.4 Django中的应用

celery_task/main.py

# celery启动文件
from celery import Celery


# 创建celery实例
celery_app = Celery('ja')
# 把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# 自动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms'])

celery_task/config.py

#!/usr/bin/env python

# 消息中间件
broker_url= "redis://guest:guest@127.0.0.1:6379/1"
# 任务结果存储
result_backend= "redis://guest:guest@127.0.0.1:6379/2"
# 时区
timezone = 'Asia/Shanghai'
# 是否使用UTC
enable_utc = False

celery_task/task01.py

import time
from celery_tasks.celery import cel

@cel.task
def send_email(res):
    time.sleep(5)
    return "完成向%s发送邮件任务"%res

celery_task/task02.py

import time
from celery_tasks.celery import cel
@cel.task
def send_msg(name):
    time.sleep(5)
    return "完成向%s发送短信任务"%name

celery_task/…/produce_task.py:

#!/usr/bin/env python
from celery_task import send_email, cel
from celery.result import AsyncResult

result = send_email.delay("ja1")
print(result.id)


async_result=AsyncResult(id=result.id, app=cel)
if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')

启动命令

celery -A celery_tasks.main worker -l info --concurrency=1000 -P eventlet -c 1000

-A指对应的应用程序, 其参数是项目中 Celery实例的位置。worker指这里要启动的worker。-l指日志等级,比如info等级。默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。如何自己指定进程数:celery worker -A proj --concurrency=4如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000 3.定时任务

crontab_task.py:

from celery_task import send_email
import datetime

v1 = datetime.datetime.now()
v1=v1+datetime.timedelta(seconds=5)
print(f"now time {v1}")
v2 = datetime.datetime.utcfromtimestamp(v1.timestamp())
result = send_email.apply_async(args=["ja",], eta=v2)
print(result.id)

多任务结构中 celery.py 修改如下:

#!/usr/bin/env python
from celery import Celery
from celery.schedules import crontab

# 消息中间件
broker_url = "redis://guest:guest@127.0.0.1:6379/1"
# 任务结果存储
result_backend = "redis://guest:guest@127.0.0.1:6379/2"
cel = Celery('tasks', broker=broker_url,
             backend=result_backend,
             include=['celery_tasks.task01', 'celery_tasks.task02',
                      ])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字随意命名
    'add-every-10-seconds': {
        # 执行tasks1下的test_celery函数
        'task': 'celery_tasks.task01.send_email',
        # 每隔1分钟执行一次
        'schedule': crontab(minute="*/1"),
        # 传递参数
        'args': ('ja',)
    },
    'add-every-12-seconds': {
        'task': 'celery_tasks.task01.send_email',
        # 每年4月11号,8点42分执行
        'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        'args': ('ja2',)
    }
}

#方式一
# Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
celery beat -A proj # 启动 Beat 程序  
celery -A proj worker -l info  #启动 worker 进程
#方式二
celery -B -A proj worker -l info

注意

启动定时任务需要清空消息队列broker_url 中的celery

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/729325.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号