Celery 是一个处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度
1.2 组成Celery 的架构由三部分组成:
| 组成部分 | 含义 |
|---|---|
| 消息中间件 | Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成.包括,RabbitMQ, Redis 等等 |
| 任务执行单元 | Worker是Celery 提供的任务执行的单元,worker 并发的运行在分布式的系统节点中 |
| 任务结果存储 | 用来存储 Worker 执行的任务的结果,Celery支持以不同方式存储任务的结果,包括 AMQP,redis |
另外,Celery 还支持不同的并发和序列化的手段
并发:Prefork, Eventlet, gevent, threads/single threaded序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等 1.3 使用场景
celery 是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行.我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时操作任务提交给 Celery 去异步执行,比如发送短信 / 邮件,消息推送,音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
1.4 优点| 特点 | 含义 |
|---|---|
| 简单 | Celery 使用和维护都非常简单,并且不需要配置文件 |
| 高可用 | woker和client会在网络连接丢失或者失败时,自动进行重试.并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用 |
| 快速 | 单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟使(用 RabbitMQ,librabbitmq,和优化设置时) |
| 灵活 | Celery几乎每个部分都可以扩展使用,自定义池实现,序列化,压缩方案,日志记录,调度器,消费者,生产者,broker传输等等 |
pip install Celery2.2 基本使用
celery_task
#!/usr/bin/env python
import celery
import time
backend='redis://127.0.0.1:6379/1'
broker='redis://127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def send_email(name):
print("向%s发送邮件..."%name)
time.sleep(5)
print("向%s发送邮件完成"%name)
return "ok"
produce_task.py:
#!/usr/bin/env python
from celery_task import send_email, cel
from celery.result import AsyncResult
result = send_email.delay("ja1")
print(result.id)
async_result=AsyncResult(id=result.id, app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
启动命令:
celery -A celery_task worker -l info -P eventlet2.3 多任务结构
celery_task/celery.py:
#!/usr/bin/env python
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
include_list = ['celery_tasks.task01', 'celery_tasks.task02']
cel = Celery('celery_demo', broker=broker, backend=backend,
include=include_list)
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
celery_task/task01.py
import time
from celery_tasks.celery import cel
@cel.task
def send_email(res):
time.sleep(5)
return "完成向%s发送邮件任务"%res
celery_task/task02.py
import time
from celery_tasks.celery import cel
@cel.task
def send_msg(name):
time.sleep(5)
return "完成向%s发送短信任务"%name
celery_task/…/produce_task.py:
#!/usr/bin/env python
from celery_task import send_email, cel
from celery.result import AsyncResult
result = send_email.delay("ja1")
print(result.id)
async_result=AsyncResult(id=result.id, app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
启动命令
celery worker -A celery_task -l info -P eventlet -c 10003.4 Django中的应用
celery_task/main.py
# celery启动文件
from celery import Celery
# 创建celery实例
celery_app = Celery('ja')
# 把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# 自动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms'])
celery_task/config.py
#!/usr/bin/env python # 消息中间件 broker_url= "redis://guest:guest@127.0.0.1:6379/1" # 任务结果存储 result_backend= "redis://guest:guest@127.0.0.1:6379/2" # 时区 timezone = 'Asia/Shanghai' # 是否使用UTC enable_utc = False
celery_task/task01.py
import time
from celery_tasks.celery import cel
@cel.task
def send_email(res):
time.sleep(5)
return "完成向%s发送邮件任务"%res
celery_task/task02.py
import time
from celery_tasks.celery import cel
@cel.task
def send_msg(name):
time.sleep(5)
return "完成向%s发送短信任务"%name
celery_task/…/produce_task.py:
#!/usr/bin/env python
from celery_task import send_email, cel
from celery.result import AsyncResult
result = send_email.delay("ja1")
print(result.id)
async_result=AsyncResult(id=result.id, app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
启动命令
celery -A celery_tasks.main worker -l info --concurrency=1000 -P eventlet -c 1000
-A指对应的应用程序, 其参数是项目中 Celery实例的位置。worker指这里要启动的worker。-l指日志等级,比如info等级。默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。如何自己指定进程数:celery worker -A proj --concurrency=4如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000 3.定时任务
crontab_task.py:
from celery_task import send_email
import datetime
v1 = datetime.datetime.now()
v1=v1+datetime.timedelta(seconds=5)
print(f"now time {v1}")
v2 = datetime.datetime.utcfromtimestamp(v1.timestamp())
result = send_email.apply_async(args=["ja",], eta=v2)
print(result.id)
多任务结构中 celery.py 修改如下:
#!/usr/bin/env python
from celery import Celery
from celery.schedules import crontab
# 消息中间件
broker_url = "redis://guest:guest@127.0.0.1:6379/1"
# 任务结果存储
result_backend = "redis://guest:guest@127.0.0.1:6379/2"
cel = Celery('tasks', broker=broker_url,
backend=result_backend,
include=['celery_tasks.task01', 'celery_tasks.task02',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False
cel.conf.beat_schedule = {
# 名字随意命名
'add-every-10-seconds': {
# 执行tasks1下的test_celery函数
'task': 'celery_tasks.task01.send_email',
# 每隔1分钟执行一次
'schedule': crontab(minute="*/1"),
# 传递参数
'args': ('ja',)
},
'add-every-12-seconds': {
'task': 'celery_tasks.task01.send_email',
# 每年4月11号,8点42分执行
'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
'args': ('ja2',)
}
}
#方式一 # Celery Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列 celery beat -A proj # 启动 Beat 程序 celery -A proj worker -l info #启动 worker 进程 #方式二 celery -B -A proj worker -l info
注意
启动定时任务需要清空消息队列broker_url 中的celery



