- 1.APScheduler简介
- 2.APScheduler调度方式
- 3.APScheduler组件
- 3.1 triggers
- 3.2 job stores:支持四种任务存储方式
- 3.3 schedulers:
APScheduler是一个Python库,可让您安排稍后执行的Python代码,是一套任务调度框架,可以用来做定时任务控制器,可以添加删除任务。如果将作业存储在数据库中,它们也将在调度程序重新启动后继续运行并保持其状态。重新启动调度程序后,它将运行它应该在脱机时运行的所有作业。
我这里先给出一个写调度api的模版吧
- 步骤一:写好自己运行的函数 def fuction()
- 步骤二:创建flask路由,在路由下定义一个需要调度的函数def job_function():,把之前的def fuction()函数放到里面
- 步骤三:创建调度任务函数,def task(),这里需要配置任务调度的方式等参数
- 步骤四:运行任务 task(),主要写在主函数外面
- 步骤五:主函数运行api接口,app.run()
from flask import Flask
from flask_apscheduler import APScheduler
import datetime
def fuction()
print("showmaker")
app = Flask(__name__)
@app.route("/test", methods=['GET','POST'])
def job_function():
fuction()
def task():
scheduler = APScheduler()
scheduler.init_app(app)
# 定时任务的格式
scheduler.add_job(func=job_function, trigger='interval',seconds=3600, id='my_job_id')
scheduler.start()
# 写在main里面,IIS不会运行
task()
if __name__ == "__main__":
app.run(debug=False, port="5005")
2.APScheduler调度方式
APScheduler有三种可以调度任务的方式:
我们的目的就是通过api接口,在终端一直运行这个接口进程,让它自己去调度我们的程序,这里的程序也就是任务。
- 可以选择调度的任务,开始和结束的时间
- 基于一个时间区间,间隔的执行任务
- 一次性任务执行
知道了我们的任务有哪些调度方式,现在了解一下APScheduler的组件:
- triggers: 任务触发器组件,提供任务触发方式
- job stores: 任务商店组件,提供任务保存方式
- executors:任务调度组件,提供任务调度方式
- schedulers: 任务调度组件,提供任务工作方式
triggers: 支持三种任务触发方式
date:
固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建
| 参数 | 说明 |
|---|---|
| run_date | 任务运行日期 |
| timezone | 时区 |
scheduler.add_job(func=job_function, trigger='date',run_date='2021-10-27 13:00:00', id='my_job_id')
interval:
时间间隔触发器,每个一定时间间隔执行一次。
| 参数 | 说明 |
|---|---|
| weeks (int) | 间隔几周 |
| days (int) | 间隔几天 |
| hours (int) | 间隔几小时 |
| minutes (int) | 间隔几分钟 |
| seconds (int) | 间隔多少秒 |
| start_date (datetime 或 str) | 开始日期 |
| end_date (datetime 或 str) | 结束日期 |
scheduler.add_job(func=job_function, trigger='interval',seconds=3600, id='my_job_id')
cron
cron风格的任务触发,这个不常用就不介绍了,我懒
3.2 job stores:支持四种任务存储方式- memory:默认配置任务存在内存中
- mongdb:支持文档数据库存储
- sqlalchemy:支持关系数据库存储
- redis:支持键值对数据库存储
这里如果你的数据库是mysql,我推荐使用sqlalchemy作为数据库,用过create_engine来连接数据库实现数据库操作,比如说删除数据记录,写入数据记录。这里是我自己的例子
from flask import Flask
from flask_apscheduler import APScheduler
import datetime
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import metaData,Table
def delete():
engine = create_engine('mysql+pymysql://用户名:密码@ip:端口号/数据库?charset=gbk')
meta = metaData(bind=engine)
tb_1 = Table('数据表名', meta, autoload=True, autoload_with=engine)
tb_2 = Table('数据表名', meta, autoload=True, autoload_with=engine)
dlt_1=tb_1.delete()
dlt_2=tb_2.delete()
conn = engine.connect()
# 执行delete操作
conn.execute(dlt_1)
conn.execute(dlt_2)
conn.close()
def write(t):
r1=pd.Dataframe()
r3=pd.Dataframe()
engine = create_engine('mysql+pymysql://用户名:密码@ip:端口号/数据库?charset=gbk')
r1.to_sql('数据表名', con=engine, if_exists='append', index=False)
r3.to_sql('数据表名', con=engine, if_exists='append', index=False)
def write_log(buf):
print(buf)
with open('test.txt', 'a') as f:
f.write(buf + "n")
app = Flask(__name__)
@app.route("/test", methods=['GET','POST'])
def job_function():
now_time=datetime.datetime.now()
t=now_time.strftime('%Y-%m-%d')
write_log("update time:"+t)
delete()
write_log("delete data")
write(t)
write_log("write data")
def task():
scheduler = APScheduler()
scheduler.init_app(app)
# 定时任务,每隔10s执行1次
scheduler.add_job(func=job_function, trigger='interval',seconds=60, id='my_job_id')
scheduler.start()
# 写在main里面,IIS不会运行
task()
if __name__ == "__main__":
app.run(debug=False, port="8888")
最后可以在test.txt查看自己的调用的时间
3.3 schedulers:调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用
- BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
- BackgroundScheduler: 当不运行其它框架 的时候使用,并使你的任务在 后台运行
- AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
- GeventScheduler: 和 gevent 框架配套使用
- TornadoScheduler: 和 tornado 框架配套使用
- TwistedScheduler: 和 Twisted 框架配套使用
- QtScheduler: 开发 qt 应用的时候使用



