sanic框架类似flask框架,也是轻量化类型。不同的是sanic自带协程。其相应速度比flask更快
from sanic import Sanic
from sanic.views import HTTPMethodView
from apscheduler.schedulers.background import BackgroundScheduler
from sanic.response import json
import threading
from apscheduler.executors.pool import ThreadPoolExecutor
import datetime
import asyncio
executors = {
"default": ThreadPoolExecutor(3)
}
class SanicView(HTTPMethodView):
def __init__(self):
self.scheduler = BackgroundScheduler(executors=executors)
async def send_sms(self, name, task_id):
print("你是 ", name, task_id, threading.current_thread())
def run(self, name, task_id):
loop = asyncio.new_event_loop()
tasks = [
loop.create_task(self.send_sms(name, task_id))
]
loop.run_until_complete(asyncio.wait(tasks))
def make_scheduler(self,name, task_id):
self.scheduler.add_job(func=self.run, args=(name, task_id,),
next_run_time=datetime.datetime.now() + datetime.timedelta(minutes=1), id=task_id)
def start_sched(self, name, task_id):
self.make_scheduler(name, task_id)
print(self.scheduler.get_jobs())
self.scheduler.start()
async def get(self, request):
data = request.args
name = data.get("name", "")
task_id = data.get("task_id", "")
self.start_sched(name, task_id)
return json({"code":200, "msg": "success"})
async def post(self, request):
data = request.json
name = data.get("name", "")
task_id = data.get("task_id", "")
print(name, task_id)
return json({"code": 200, "msg": "post success"})
app = Sanic(__name__)
app.add_route(SanicView.as_view(), "/sanic_test")
if __name__ == '__main__':
app.run("127.0.0.1", port=8000)
注意的是,如果定时任务函数是async修饰的,则需要如上。不过低版本sanic似乎不支持上述处理。如果定时任务为普通方法,则无需上述处理,直接在定时函数中调用



