目录
底层原理
select epoll 运行机制
Tornado中的Ioloop模块的作用
获取请求方式
文件上传与展示
通过请求头信息的判断来进行反爬验证
注册功能demo
重定向
用户登录 以及自己设置的错误界面跳转
Tornado 异步服务器端方式
客户端异步请求
Tornado基于epoll,ioloop中运行epoll机制
更好的支持高并发,建立更多的服务器连接
底层原理
IO多路复用
IO多路复用支持三种方式:select,poll,epoll
windowsMac:select
linux:三种都支持
select:最多只能有1024个连接
epoll:连接没有上限
poll是过渡产物
select epoll 运行机制
ioloop中有多个socket对象(绑定了IP地址和端口号)
select不断轮询所有socket对象(无论对象处于什么状态),直到有socket对象达到就绪状态
epoll只会询问所有socket对象一次,并给所有socket对象绑定监听函数,当要建立连接的时候,当某个socket对象达到就绪状态时,回调函数会通知用户,然后建立连接。
另外,当处理类中有堵塞时,直接响应状态,不返回结果,由协程继续执行其他的响应。
Tornado中的Ioloop模块的作用
ioloop就是对I/O多路复用的封装,它实现了一个单例,将这个单例保存在IOLoop._instance中
ioloop实现了Reactor模型,将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。
另外,ioloop还被用来集中运行回调函数以及集中处理定时任务。
学习链接
获取请求方式
get post中使用的方法
import tornado.web
import tornado.ioloop
class IndexHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.render('templates/login.html')
class LoginHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
# 取参数为html中的属性值
# get_arguments 取多个属性值 value属性值不同时使用
uname = self.get_argument('uname')
# get_query_xx只适用于get 请求
pwd = self.get_query_argument('pwd')
print(uname, pwd)
self.write(uname + ',' + pwd)
def post(self, *args, **kwargs):
uname = self.get_body_argument('uname')
print(uname)
self.write(uname)
app = tornado.web.Application([(r'^/$', IndexHandler),
(r'^/login/$', LoginHandler), ])
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
文件上传与展示
import tornado.web
import tornado.ioloop
class UploadHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.render('templates/upload.html')
def post(self, *args, **kwargs):
# 获取上传的文件
# 获取的数据 [{'body':b' xffxxx',"content_type":"image/jpeg,"filename":l.jpg}]
img1 = self.request.files['img1']
# 遍历 img1
for img in img1:
body = img.get('body', '')
content_type = img.get('content_type', '')
filename = img.get('filename', '')
# 将图片存到files目录中
import os
dir = os.path.join(os.getcwd(), "files", filename)
with open(dir, 'wb') as fw:
fw.write(body)
# 将图片显示到浏览器页面中
# 设置响应头信息
self.set_header('Content-Type', content_type)
self.write(body)
app = tornado.web.Application([(r'/upload/', UploadHandler)])
app.listen(8888)
tornado.ioloop.IOLoop.instance().start()
通过请求头信息的判断来进行反爬验证
import tornado.ioloop
import tornado.web
user_agents = [
"Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Mobile Safari/537.36"
]
# 工具类,检测请求
class RequestTools:
class UaCheck:
def __new__(cls, func):
# 1 引入要装饰的类的类对象以及类函数
# 然后返回inner方法在inner方法中调用被装饰的类的类函数
cls.func = func
return cls.inner
@staticmethod
def check(handler):
# 2 定义检测函数
return handler.request.headers["User-Agent"]
@staticmethod
def inner(handler):
# 3 运行检测方法 如果检测失败则返回UA错误,如果检测没问题则继续运行原类实例方法
if not __class__.check(handler):
handler.write("UA错误")
else:
return __class__.func(handler)
class FrequencyCheck:
ip_count = {}
def __new__(cls, func):
cls.func = func
return cls.inner
@staticmethod
def check(handler):
ip = handler.request.remote_ip
num = __class__.ip_count.get(ip, 0) + 1
__class__.ip_count[ip] = num
return __class__.ip_count[ip] > 3
@staticmethod
def inner(handler):
if __class__.check(handler):
handler.write("请求频率过高")
else:
return __class__.func(handler)
class LoginHandler(tornado.web.RequestHandler):
# 加检测功能
@RequestTools.UaCheck
@RequestTools.FrequencyCheck
def get(self):
self.write("GET")
# 业务类
app = tornado.web.Application(
[
(r'^/$', LoginHandler)
]
)
# 第二个参数可传本机ip,否则默认127.0.0.1
app.listen(8887, "localhost")
tornado.ioloop.IOLoop.current().start()
注册功能demo
import tornado.web
import tornado.ioloop
import MySQLdb
def _getConn():
return MySQLdb.connect("localhost", "root", "123zx000", db="Tornado", port=3306)
class RegisterHandler(tornado.web.RequestHandler):
def initialize(self, conn):
self.conn = conn
print(conn)
def get(self, *args, **kwargs):
self.render('templates/register.html')
def post(self, *args, **kwargs):
# 获得请求参数
uname = self.get_argument('uname')
pwd = self.get_argument('pwd')
# 将数据插入到数据库中
try:
cursor = self.conn.cursor()
cursor.execute('insert into t_auth values(null,"%s","%s",now())' % (uname, pwd))
self.conn.commit()
self.write("注册成功")
except:
# 数据库回滚
self.conn.rollback()
self.redirect('/register/')
# 数据库连接对象放在此处 会自动传递给 initialize方法中的conn
app = tornado.web.Application([(r'^/register/$', RegisterHandler, {"conn": _getConn()})])
app.listen(8888)
tornado.ioloop.IOLoop.instance().start()
重定向
客户端发出请求,服务器给客户端一个返回的ip地址和302状态码
客户端再向访问新ip地址
import tornado.web
from tornado.web import RedirectHandler
import tornado.ioloop
class IndexHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
# 第一种方法 重定向 302
# self.redirect('https://www.baidu.com')
# 方法2
self.set_status(301)
self.set_header("Location", "https://www.jd.com")
app = tornado.web.Application(
[(r'^/1/$', IndexHandler),
# 方法3
(r'^/red3/$', RedirectHandler, {"url": "https://www.taobao.com"}), ])
app.listen(8889)
tornado.ioloop.IOLoop.instance().start()
用户登录 以及自己设置的错误界面跳转
import tornado.web
import tornado.ioloop
import MySQLdb
class LoginHandler(tornado.web.RequestHandler):
def initialize(self, conn):
self.conn = conn
# 收到请求 就会执行此方法 可接收参数
def prepare(self):
# 判断当前请求方式
if self.request.method == "POST":
# 获取请求参数
self.uname = self.get_argument("uname")
self.pwd = self.get_argument("pwd")
print()
def get(self, *args, **kwargs):
# print(1)
self.render("templates/login.html")
def post(self, *args, **kwargs):
1 / 0
cursor = self.conn.cursor()
cursor.execute('select * from t_auth where uname="%s" and pwd="%s"' % (self.uname, self.pwd))
user = cursor.fetchone()
print(user)
if user:
self.write(u"登录成功")
else:
self.write(u"登录失败")
# 如果访问遇到错误,跳转到指定的错误页面
def write_error(self, status_code, **kwargs):
self.render('templates/error.html')
# 设置服务器信息
def set_default_headers(self):
self.set_header('Server', 'SXTServer')
settings = {'debug': True}
dbconfig = {
'host': '127.0.0.1', 'user': 'root',
'password': "123zx000", 'db': 'Tornado', "port": 3306}
app = tornado.web.Application(
[(r'^/login/$', LoginHandler, {'conn': MySQLdb.connect(**dbconfig)}),
], **settings)
app.listen(8887)
tornado.ioloop.IOLoop.instance().start()
cookie
遇到bug 加了expire 设置cookie后无法获取
import tornado.web
import tornado.ioloop
class SetcookieHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
self.set_secure_cookie("name", "zhangsan")
# self.set_cookie("hello", "zhangsan")
class GetcookieHandler(tornado.web.RequestHandler):
def get(self, *args, **kwargs):
print(self.request.cookies)
# name = self.get_cookie("hello")
name = self.get_secure_cookie("name")
self.write(name)
# 加密
settings = {"cookie_secret": "abcdefg"}
app = tornado.web.Application([
(r'^/$', SetcookieHandler),
(r'/getcookie/$', GetcookieHandler)], **settings)
app.listen(8886)
tornado.ioloop.IOLoop.instance().start()
Tornado 异步服务器端方式
Tornado 6.0之前支持asynchronous 之后只支持coroutine(协程)
import os
from tornado.concurrent import Future
from tornado.gen import coroutine
from tornado.web import RequestHandler, Application
from tornado.ioloop import IOLoop
class IndexHandler(RequestHandler):
@coroutine
def get(self, filename):
# 把耗时的操作交给回调函数进行异步处理
content = yield self.readImg(filename)
if not content:
self.write_error(404)
else:
self.set_header("Content-Type", 'image/png')
self.write(content)
def readImg(self, filename):
baseDir = os.path.join(os.getcwd(), 'static', filename)
print(baseDir)
with open(baseDir, 'rb') as fr:
content = fr.read()
# 相当于生成器中的send方法,设定上一次的yield语句返回的值
future = Future()
future.set_result(content)
return future
app = Application([(r'^/static/(.*)$', IndexHandler)])
app.listen(8000)
IOLoop.instance().start()
客户端异步请求
可能是版本更改导致
asyncClient.fetch(url, callback) 未正常使用
import os
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
# 在控制台打印页面信息
def parse(con):
import bs4
bs = bs4.BeautifulSoup(con, 'html.parser')
h4List = [h4.text for h4 in bs.select('ul.foot_nav.main h4')]
for h in h4List:
print(h)
def handle_response(response):
# 获取页面内容
# print(221)
content = response.body()
# print(content)
# 写入到index.html页面中
with open(os.path.join(os.getcwd(), 'templates', 'index.html'), 'wb') as fw:
fw.write(content)
# 解析文档信息打印相关内容到控制台
parse(content)
def loadPage(url, callback):
# 创建异步客户端
asyncClient = AsyncHTTPClient()
# 获取页面内容
asyncClient.fetch(url, callback)
# print(1111)
loadPage('http://www.bjsxt.com', handle_response)
IOLoop.instance().start()
Websocket 的demo
from tornado.web import RequestHandler, Application
from tornado.ioloop import IOLoop
from tornado.websocket import WebSocketHandler
import os
class IndexHander(RequestHandler):
def get(self, *args, **kwargs):
self.render('index.html')
class SockHandler(WebSocketHandler):
def open(self, *args, **kwargs):
print(u'建立服务器连接')
def on_message(self, message):
print(u'收到客户端的消息:%s' % message)
self.write_message('hello client')
def on_close(self):
print(u'断开服务器连接')
# 允许跨域请求
def check_origin(self, origin):
return True
app = Application([(r'^/$', IndexHander),
(r'^/websocket/$', SockHandler)], template_path=os.path.join(os.getcwd(), "templates"))
app.listen(8001)
IOLoop.instance().start()
Title
据目前的理解,tornado框架的异步非堵塞的实现主要依赖于ioloop模块,此模块中主要是epoll机制 操作支持asyncio异步,下面是asyncio的介绍
我的大略理解:归根结底,python还是一个线程来执行任务,是用协程来实现的高并发,如遇到耗时操作,主线程并未等待,而是去执行EventLoop(循环队列)中其他可以执行的coroutine(协程任务)了,因此可以实现并发执行。
asyncio提供了完善的异步IO支持;
异步操作需要在coroutine中通过yield from完成;
多个coroutine可以封装成一组Task然后并发执行。
asyncio学习链接
学习链接:
知乎Tornado
Tornado框架



