栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Celery介绍和基本使用(基于4.3.0版本)

Celery介绍和基本使用(基于4.3.0版本)

Celery介绍和基本使用

一、Celery介绍和基本使用

    Celery官方文档:

    Celery英文手册Celery中文手册

    Celery是什么?
    Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

    异步任务:将耗时的操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音频处理等等做一个定时任务,比如每天定时执行爬虫爬取指定内容还可以使用celery实现简单的分布式爬虫系统等等

    Celery 在执行任务时需要通过一个消息中间件(Broker)来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis

    Celery有以下优点:

    简单:Celery 易于使用和维护,并且它 不需要配置文件 ,并且配置和使用还是比较简单的(后面会讲到配置文件可以有)高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务快速:单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级灵活: Celery 几乎所有部分都可以扩展或单独使用,各个部分可以自定义。

    Celery执行流程图如下

二、Celery安装使用

    安装celery模块
    pip install celery==4.3.0

    Celery的默认broker(消息中间件)是RabbitMQ, 仅需配置一行就可以
    BROKER_URL = 'amqp://guest:guest@localhost:5672/yard'

    使用Redis做broker(消息中间件)也可以 本地安装redis数据库(redis安装流程不再重复)
    pip install redis

    注意:
    celery不支持在windows下运行任务,需要借助eventlet来完成
    pip install eventlet

三、Celery异步任务使用代码示例

    对比说明

      不使用Celery的情况下我们执行一个耗时的任务,创建一个app.py 文件

      import time
      def add(x,y):
         time.sleep(5)
         return x+y
      if **name** == '**main**':
         print('task start....')
         result = add.delay(2,3)
         print('task end....')
         print(result)
      

      运行代码发现出现5秒后打印了结果

      task start....
      task end....
      5
      

      使用Celery执行(异步任务调度的情况)

        step1:新建一个tasks.py文件

        import time
        from celery import Celery
        
        #消息中间件(使用的redis)
        broker = 'redis://localhost:6379/1'
        #结果存储(使用的redis)
        backend = 'redis://localhost:6379/2'
        #实例化Celery对象
        app = Celery(
           'celeryDemo',
           broker=broker,
           backend=backend
        )
        
        # 添加@app.task()装饰器,说明执行的任务是一个异步任务
        @app.task()
        def add(x,y):
           print('task enter ....')
           time.sleep(5)
           return x+y
        

        step2:修改app.py文件,代码如下

        from tasks import add
        
        if **name** == '**main**':
           print('task start....')
           result = add.delay(2,3)
           print('task end....')
           print(result)
        

        上述代码修改完毕后: 打开终端,进入项目下,运行 app.py文件

        python3 app.py
        

        注意:立即执行结果如下,此时我们可以看到反回了一个任务id(47ad971c-9a9a-44ca-bee4-a71f44eff048),并没有打印结果,因为没有执行worker端

        运行worker端 打开终端,进入项目下,输入如下命令:

        celery -A tasks worker -l info
        # windows下启动的时候,使用eventlet 方式
        # celery -A tasks worker --loglevel=info -P eventlet  -c 10 
        # -c是协程的数量,生产环境可以用1000
        

        结果如下:

        -------------- celery@DESKTOP-9G0AUUR v5.2.3 (dawn-chorus)
        --- ***** -----
        -- ******* ---- Windows-10-10.0.19041-SP0 2022-03-23 12:22:13
        + *** --- * ---
        + ** ---------- [config]
        + ** ---------- .> app:         celeryDemo:0x21eea2c6088
        + ** ---------- .> transport:   redis://127.0.0.1:6379/1
        + ** ---------- .> results:     redis://127.0.0.1:6379/2
        + ***--- * --- .> concurrency: 10 (eventlet)
        -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
        ---***** -----
        -------------- [queues]
                       .> celery           exchange=celery(direct) key=celery
        
        [tasks]
        . tasks.add
        
        [2022-03-23 12:22:13,568: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
        [2022-03-23 12:22:13,572: INFO/MainProcess] mingle: searching for neighbors
        [2022-03-23 12:22:14,628: INFO/MainProcess] mingle: all alone
        [2022-03-23 12:22:14,648: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.
        [2022-03-23 12:22:14,652: INFO/MainProcess] celery@DESKTOP-9G0AUUR ready.
        [2022-03-23 12:22:14,657: INFO/MainProcess] Task tasks.add[47ad971c-9a9a-44ca-bee4-a71f44eff048] received
        [2022-03-23 12:22:14,658: WARNING/MainProcess] task enter ....
        [2022-03-23 12:22:14,662: INFO/MainProcess] Task tasks.add[47ad971c-9a9a-44ca-bee4-a71f44eff048] succeeded in 0.0s: 5
        

        此时如果不断的向broker中发送消息,那么worker中就会接收到消息并执行 打开终端操作如下:

        (blog) G:PythonDjangocelery使用>python
        Python 3.7.4 (tags/v3.7.4:e09359112e, Jul  8 2019, 20:34:20) [MSC v.1916 64 bit (AMD64)] on win32
        Type "help", "copyright", "credits" or "license" for more information.
        >>> from tasks import add # 导入执行的方法
        >>> add.delay(2,5) # 发送异步任务消息
        
        >>> result = add.delay(3,5)
        >>> result.ready() # 查看异步任务是否执行完毕
        True
        >>> result.get() # 获取任务执行完毕后的结果
        8
        

    Celery的配置文件(优化版)
    由于Celery的配置信息比较多,通常情况下,我们会创建一个Celery的配置文件, 这里命名为 celeryconfig.py

    step1:
    在之前的项目中创建一个celery_demo的python Package文件夹 在__init__.py中添加如下代码:

    from celery import Celery
    
    # include:导入指定的任务模块
    # 这一次创建 app,并没有直接指定 broker(消息中间件来接收和发送任务消息) 和 backend(存储结果)。而是在配置文件中。
    app = Celery(
       'demo',
       include=[
          'celery_demo.task1',
          'celery_demo.task2',
       ]
    )
    # 通过Celery 实例加载配置模块
    app.config_from_object(
       'celery_demo.celeryconfig',
    )
    

    step2:
    在celery_demo文件夹下新建一个celeryconfig.py文件(Celery的配置文件) 添加如下代码:

    # 官方配置文档:查询每个配置项的含义。
    
    # 
    
    # broker(消息中间件来接收和发送任务消息)
    BROKER_URL = 'redis://localhost:6379/1'
    # backend(存储worker执行的结果)
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
    
    # 设置时间参照,不设置默认使用的UTC时间
    CELERY_TIMEZONE = 'Asia/Shanghai'
    # 指定任务的序列化
    CELERY_TASK_SERIALIZER='json'
    # 指定执行结果的序列化
    CELERY_RESULT_SERIALIZER='json'
    

    step3:在celery_demo文件夹下新建task1.py和task2.py文件

    task1.py文件中执行如下代码

    import time
    from celery_demo import app
    
    @app.task
    def add(x,y):
       time.sleep(5)
       return x+y
    

    task2.py文件中执行如下代码

    import time
    from celery_demo import app
    
    @app.task
    def mut(x,y):
       time.sleep(5)
       return x*y
    

    step4:修改app.py文件,代码如下,分别发送执行任务消息到broker

    from celery_demo import task1,task2
    
    # apply_async和delay都表示调用异步任务
    task1.add.delay(2,4)
    
    # task1.add.apply_async(2,4)
    
    task2.mut.delay(3,4)
    
    print('end...')
    

    查看运行结果,打开终端启动worker,如下则表示启动成功:

    (blog) G:PythonDjangocelery使用>celery worker -A celery_demo -l INFO -P eventlet  -c 10  
    
    -------------- celery@DESKTOP-9G0AUUR v4.3.0 (rhubarb)
    ---- **** -----
    --- * **** -- Windows-10-10.0.19041-SP0 2022-03-23 16:26:31
    -- * - **** ---
    + ** ---------- [config]
    + ** ---------- .> app:         demo:0x2bc35c9ba08
    + ** ---------- .> transport:   redis://localhost:6379/1
    + ** ---------- .> results:     redis://localhost:6379/2
    + ***--- * --- .> concurrency: 10 (eventlet)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    ---***** -----
    -------------- [queues]
                   .> celery           exchange=celery(direct) key=celery
    
    [tasks]
    . celery_demo.task1.add
    . celery_demo.task2.mut
    
    [2022-03-23 16:26:39,476: INFO/MainProcess] Connected to redis://localhost:6379/1
    [2022-03-23 16:26:41,532: INFO/MainProcess] mingle: searching for neighbors
    [2022-03-23 16:26:48,671: INFO/MainProcess] mingle: all alone
    [2022-03-23 16:26:50,701: INFO/MainProcess] pidbox: Connected to redis://localhost:6379/1.
    [2022-03-23 16:26:54,765: INFO/MainProcess] celery@DESKTOP-9G0AUUR ready.
    
    

    发送执行任务消息,打开终端,进行如下操作

    (blog) G:PythonDjangocelery使用>python app.py
    end...
    

    此时会看到worker端接受到了两个任务并开始执行,如下所示:

    [2022-03-23 16:28:30,167: INFO/MainProcess] Received task: celery_demo.task1.add[308aa4ce-2443-4618-b2e5-6a6a8c3dac4b]  
    [2022-03-23 16:28:30,169: INFO/MainProcess] Received task: celery_demo.task2.mut[dc0eed0a-75bf-4c69-a691-9cc39513db18]  
    [2022-03-23 16:28:35,190: INFO/MainProcess] Task celery_demo.task1.add[308aa4ce-2443-4618-b2e5-6a6a8c3dac4b] succeeded in 5.01600000000326s: 6
    [2022-03-23 16:28:37,234: INFO/MainProcess] Task celery_demo.task2.mut[dc0eed0a-75bf-4c69-a691-9cc39513db18] succeeded in 7.061999999976251s: 12
    

四、定时任务

step1:
在之前代码的celery_demo文件夹下的celeryconfig.py文件中,添加如下代码

from datetime import timedelta
from celery.schedules import crontab
# 设置定时任务
CELERYBEAT_SCHEDULE = {
   'task1':{
      'task':'celery_demo.task1.add',
      'schedule':timedelta(seconds=10), 表示每10秒发送一次任务消息
      'args':(10,20)
   },
   'task2':{
      'task':'celery_demo.task2.mut',
      'schedule':crontab(hour=22,minute=24), #表示在每天的晚上10点24分发送一次任务消息
      'args':(10,10)
   }
}

step2:
启动定时消息任务,打开终端,执行如下命令

celery beat -A celery_demo -l INFO

启动

celery beat v4.3.0 (rhubarb) is starting.
__-    ...__   -        _
LocalTime -> 2022-03-23 16:19:33
Configuration ->
   . broker -> redis://localhost:6379/1
   . loader -> celery.loaders.app.AppLoader
   . scheduler -> celery.beat.PersistentScheduler
   . db -> celerybeat-schedule
   . logfile -> [stderr]@%INFO
   . maxinterval -> 5.00 minutes (300s)
[2022-03-23 16:19:33,228: INFO/MainProcess] beat: Starting...

如果要执行的是爬虫任务,只需要将上面的方法中的代码替换为爬虫代码即可。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774574.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号