import pytz
import datetime
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
class Scheduler(object):
def __init__(self, background=True):
jobstores = {
}
executors = {
'default': ThreadPoolExecutor(50),
}
job_defaults = {
'coalesce': True,
'max_instances': 1
}
timez = pytz.timezone('Asia/Shanghai')
if background:
self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,
job_defaults=job_defaults, timezone=timez)
else:
self.scheduler = BlockingScheduler(jobstores=jobstores, executors=executors,
job_defaults=job_defaults, timezone=timez)
self.jobs = {}
self.scheduler.add_listener(self.my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
def add_job(self, func, seconds=0, minutes=0, args=None, job_id=None, max_instances=1):
if func is None:
return
if job_id is None:
job_id = func.__name__
self.jobs[job_id] = {
'job_id': job_id,
'job_func': func,
'args': args,
'interval': minutes*60 + seconds
}
self.scheduler.add_job(func, 'interval', args=args, next_run_time=datetime.datetime.now(),
seconds=seconds, minutes=minutes, id=job_id, name=job_id,
max_instances=max_instances)
def add_cron(self, func, minute='0', hour='*', day='*', month='*', day_of_week='*',
args=None, job_id=None, max_instances=1):
if func is None:
return
if job_id is None:
job_id = func.__name__
self.jobs[job_id] = {
'job_id': job_id,
'job_func': func,
'args': args,
'hour': hour,
'minute': minute,
'day': day,
'month': month,
'day_of_week': day_of_week
}
tz = pytz.timezone('Asia/Shanghai')
self.scheduler.add_job(func, 'cron', args=args, minute=minute, hour=hour, day=day,
month=month, day_of_week=day_of_week, id=job_id, name=job_id,
max_instances=max_instances, timezone=tz)
def remove_job(self, job_id):
self.scheduler.remove_job(job_id)
self.jobs.pop(job_id)
def get_jobs(self):
return self.jobs.values()
def get_scheduler_jobs(self):
return self.scheduler.get_jobs()
def start(self):
self.scheduler.start()
def stop(self):
self.scheduler.shutdown()
def remove_all_jobs(self):
self.scheduler.remove_all_jobs()
def my_listener(self, event):
if event.exception:
pass
def get_scheduler(self):
return self.scheduler
使用方法
class CronJob():
def __init__(self, app):
self.scheduler = Scheduler(db_uri)
def init(self):
try:
# 测试任务 一分钟一次
self.scheduler.add_job(test, seconds=0, minutes=1, args=None, job_id="test", max_instances=1)
self.app.logger.info('test intervaljob Added!')
except Exception as err:
self.app.logger.exception(err)
def run(self):
self.scheduler.start()
def test():
print('hello world')



