栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Tornado框架 视图类加装饰器 进行反爬 重复访问攻击验证

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Tornado框架 视图类加装饰器 进行反爬 重复访问攻击验证

类装饰器

主要思想 业务和公共功能解耦

注意步骤:

1 定义new方法 引入要装饰的类的类对象以及类函数,然后返回inner函数在inner函数中处理验证和调用被装饰类的函数功能。

2 定义检测静态方法

3 在inner静态方法中检测,如果监测没问题则返回被装饰类的类方法,否则调用检测的静态方法

import tornado.ioloop
import tornado.web

user_agents = [
    "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Mobile Safari/537.36"
]


# 工具类,检测请求
class RequestTools:
    class UaCheck:

        def __new__(cls, func):
            # 1 引入要装饰的类的类对象以及类函数 
            # 然后返回inner方法在inner方法中调用被装饰的类的类函数
            cls.func = func
            return cls.inner

        @staticmethod
        def check(handler):
            # 2 定义检测函数
            return handler.request.headers["User-Agent"]

        @staticmethod
        def inner(handler):
            # 3 运行检测方法 如果检测失败则返回UA错误,如果检测没问题则继续运行原类实例方法
            if not __class__.check(handler):
                handler.write("UA错误")
            else:
                return __class__.func(handler)

    class FrequencyCheck:
        ip_count = {}

        def __new__(cls, func):
            cls.func = func
            return cls.inner

        @staticmethod
        def check(handler):
            ip = handler.request.remote_ip
            num = __class__.ip_count.get(ip, 0) + 1
            __class__.ip_count[ip] = num
            return __class__.ip_count[ip] > 3

        @staticmethod
        def inner(handler):
            if __class__.check(handler):
                handler.write("请求频率过高")
            else:
                return __class__.func(handler)


class LoginHandler(tornado.web.RequestHandler):
    # 加检测功能
    @RequestTools.UaCheck
    @RequestTools.FrequencyCheck
    def get(self):
        self.write("GET")


# 业务类
app = tornado.web.Application(
    [
        (r'^/$', LoginHandler)
    ]
)

# 第二个参数可传本机ip,否则默认127.0.0.1
app.listen(8887, "localhost")

tornado.ioloop.IOLoop.current().start()
函数装饰器
import tornado.ioloop
import tornado.web

user_agents = [
    "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Mobile Safari/537.36"
]


# 工具类,检测请求
class RequestTools:
    ip_count = {}

    @staticmethod
    def ua_check(func):
        def check(handler):
            return handler.request.headers["User-Agent"]

        def inner(handler):
            if not check(handler):
                handler.finish("UA错误")
            else:
                return func(handler)
        return inner

    @staticmethod
    def frequency_check(func):
        def check(handler):
            ip = handler.request.remote_ip
            num = RequestTools.ip_count.get(ip, 0) + 1
            # 此处__class__ == RequestTools
            __class__.ip_count[ip] = num
            return RequestTools.ip_count[ip] > 5

        def inner(handler):
            if check(handler):
                handler.finish("请求频率过高")
            else:
                return func(handler)
        return inner


class LoginHandler(tornado.web.RequestHandler):

    # 加检测功能
    @RequestTools.ua_check
    @RequestTools.frequency_check
    def get(self):
        self.write("GET")


# 业务类
app = tornado.web.Application(
    [
        (r'^/$', LoginHandler)
    ]
)

# 第二个参数可传本机ip,否则默认127.0.0.1
app.listen(8887, "localhost")

tornado.ioloop.IOLoop.current().start()

自己的例子

import tornado.ioloop
import tornado.web

user_agents = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"]


# 如果访问头信息不在定义的列表中,拒绝访问
class AccessHandler():
    def __new__(cls, func):
        # print('call', cls)
        cls.func = func
        return cls.inner

    @staticmethod
    def get_header(hander):
        ua = hander.request.headers['User-Agent']
        print(ua)
        if ua not in user_agents:
            print("not in")
            hander.send_error(403)
        else:
            hander.write('hello world')

    @staticmethod
    def inner(hander, *args, **kwargs):
        AccessHandler.get_header(hander)
        return AccessHandler.func(hander)


ipcount = {}


# 访问次数过多,拒绝访问

class LoginHandler(tornado.web.RequestHandler):
    @AccessHandler
    def get(self, *args, **kwargs):
        ip = self.request.remote_ip
        # 第一次访问ip值 没有值返回默认值0
        num = ipcount.get(ip, 0) + 1
        ipcount[ip] = num
        if ipcount[ip] > 10:
            self.send_error(403)
        else:
            self.write('正常访问')


app = tornado.web.Application([(r'^/$', AccessHandler),
                               (r'^/login/$', LoginHandler), ])

# 第二个参数可传本机ip,否则默认127.0.0.1
app.listen(8887, )

tornado.ioloop.IOLoop.current().start()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/295958.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号