主要思想 业务和公共功能解耦
注意步骤:
1 定义new方法 引入要装饰的类的类对象以及类函数,然后返回inner函数在inner函数中处理验证和调用被装饰类的函数功能。
2 定义检测静态方法
3 在inner静态方法中检测,如果监测没问题则返回被装饰类的类方法,否则调用检测的静态方法
import tornado.ioloop
import tornado.web
user_agents = [
"Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Mobile Safari/537.36"
]
# 工具类,检测请求
class RequestTools:
class UaCheck:
def __new__(cls, func):
# 1 引入要装饰的类的类对象以及类函数
# 然后返回inner方法在inner方法中调用被装饰的类的类函数
cls.func = func
return cls.inner
@staticmethod
def check(handler):
# 2 定义检测函数
return handler.request.headers["User-Agent"]
@staticmethod
def inner(handler):
# 3 运行检测方法 如果检测失败则返回UA错误,如果检测没问题则继续运行原类实例方法
if not __class__.check(handler):
handler.write("UA错误")
else:
return __class__.func(handler)
class FrequencyCheck:
ip_count = {}
def __new__(cls, func):
cls.func = func
return cls.inner
@staticmethod
def check(handler):
ip = handler.request.remote_ip
num = __class__.ip_count.get(ip, 0) + 1
__class__.ip_count[ip] = num
return __class__.ip_count[ip] > 3
@staticmethod
def inner(handler):
if __class__.check(handler):
handler.write("请求频率过高")
else:
return __class__.func(handler)
class LoginHandler(tornado.web.RequestHandler):
# 加检测功能
@RequestTools.UaCheck
@RequestTools.FrequencyCheck
def get(self):
self.write("GET")
# 业务类
app = tornado.web.Application(
[
(r'^/$', LoginHandler)
]
)
# 第二个参数可传本机ip,否则默认127.0.0.1
app.listen(8887, "localhost")
tornado.ioloop.IOLoop.current().start()
函数装饰器
import tornado.ioloop
import tornado.web
user_agents = [
"Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Mobile Safari/537.36"
]
# 工具类,检测请求
class RequestTools:
ip_count = {}
@staticmethod
def ua_check(func):
def check(handler):
return handler.request.headers["User-Agent"]
def inner(handler):
if not check(handler):
handler.finish("UA错误")
else:
return func(handler)
return inner
@staticmethod
def frequency_check(func):
def check(handler):
ip = handler.request.remote_ip
num = RequestTools.ip_count.get(ip, 0) + 1
# 此处__class__ == RequestTools
__class__.ip_count[ip] = num
return RequestTools.ip_count[ip] > 5
def inner(handler):
if check(handler):
handler.finish("请求频率过高")
else:
return func(handler)
return inner
class LoginHandler(tornado.web.RequestHandler):
# 加检测功能
@RequestTools.ua_check
@RequestTools.frequency_check
def get(self):
self.write("GET")
# 业务类
app = tornado.web.Application(
[
(r'^/$', LoginHandler)
]
)
# 第二个参数可传本机ip,否则默认127.0.0.1
app.listen(8887, "localhost")
tornado.ioloop.IOLoop.current().start()
自己的例子
import tornado.ioloop
import tornado.web
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"]
# 如果访问头信息不在定义的列表中,拒绝访问
class AccessHandler():
def __new__(cls, func):
# print('call', cls)
cls.func = func
return cls.inner
@staticmethod
def get_header(hander):
ua = hander.request.headers['User-Agent']
print(ua)
if ua not in user_agents:
print("not in")
hander.send_error(403)
else:
hander.write('hello world')
@staticmethod
def inner(hander, *args, **kwargs):
AccessHandler.get_header(hander)
return AccessHandler.func(hander)
ipcount = {}
# 访问次数过多,拒绝访问
class LoginHandler(tornado.web.RequestHandler):
@AccessHandler
def get(self, *args, **kwargs):
ip = self.request.remote_ip
# 第一次访问ip值 没有值返回默认值0
num = ipcount.get(ip, 0) + 1
ipcount[ip] = num
if ipcount[ip] > 10:
self.send_error(403)
else:
self.write('正常访问')
app = tornado.web.Application([(r'^/$', AccessHandler),
(r'^/login/$', LoginHandler), ])
# 第二个参数可传本机ip,否则默认127.0.0.1
app.listen(8887, )
tornado.ioloop.IOLoop.current().start()



