栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Python定时任务封装

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python定时任务封装

import pytz
import datetime
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED

class Scheduler(object):
    def __init__(self, background=True):
        jobstores = {
        }
        executors = {
            'default': ThreadPoolExecutor(50),
        }
        job_defaults = {
            'coalesce': True,
            'max_instances': 1
        }
        timez = pytz.timezone('Asia/Shanghai')
        if background:
            self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,
                                                 job_defaults=job_defaults, timezone=timez)
        else:
            self.scheduler = BlockingScheduler(jobstores=jobstores, executors=executors,
                                               job_defaults=job_defaults, timezone=timez)
        self.jobs = {}
        self.scheduler.add_listener(self.my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

    def add_job(self, func, seconds=0, minutes=0, args=None, job_id=None, max_instances=1):
        if func is None:
            return
        if job_id is None:
            job_id = func.__name__
        self.jobs[job_id] = {
            'job_id': job_id,
            'job_func': func,
            'args': args,
            'interval': minutes*60 + seconds
        }
        self.scheduler.add_job(func, 'interval', args=args, next_run_time=datetime.datetime.now(),
                               seconds=seconds, minutes=minutes, id=job_id, name=job_id,
                               max_instances=max_instances)

    def add_cron(self, func, minute='0', hour='*', day='*', month='*', day_of_week='*',
                 args=None, job_id=None, max_instances=1):
        if func is None:
            return
        if job_id is None:
            job_id = func.__name__
        self.jobs[job_id] = {
            'job_id': job_id,
            'job_func': func,
            'args': args,
            'hour': hour,
            'minute': minute,
            'day': day,
            'month': month,
            'day_of_week': day_of_week
        }
        tz = pytz.timezone('Asia/Shanghai')
        self.scheduler.add_job(func, 'cron', args=args, minute=minute, hour=hour, day=day,
                               month=month, day_of_week=day_of_week, id=job_id, name=job_id,
                               max_instances=max_instances, timezone=tz)

    def remove_job(self, job_id):
        self.scheduler.remove_job(job_id)
        self.jobs.pop(job_id)

    def get_jobs(self):
        return self.jobs.values()

    def get_scheduler_jobs(self):
        return self.scheduler.get_jobs()

    def start(self):
        self.scheduler.start()

    def stop(self):
        self.scheduler.shutdown()

    def remove_all_jobs(self):
        self.scheduler.remove_all_jobs()

    def my_listener(self, event):
        if event.exception:
            pass

    def get_scheduler(self):
        return self.scheduler

使用方法

class CronJob():
    def __init__(self, app):
        self.scheduler = Scheduler(db_uri)

    def init(self):
        try:
            # 测试任务  一分钟一次
            self.scheduler.add_job(test, seconds=0, minutes=1, args=None, job_id="test", max_instances=1)
            self.app.logger.info('test intervaljob Added!')

        except Exception as err:
            self.app.logger.exception(err)

    def run(self):
        self.scheduler.start()

def test():
	print('hello world')
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/330937.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号