栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

知道创宇漏洞爬取,保存到数据库,设置QQ邮箱定时提醒。

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

知道创宇漏洞爬取,保存到数据库,设置QQ邮箱定时提醒。

目标

网站地址:https://www.seebug.org/vuldb/vulnerabilities

首次爬取网站的字段保存到数据库中

对爬虫设置周期启动时间,每12小时爬取一遍最新漏洞信息,基于SSV-ID主键判断是否已在数据库中,如果没有则添加进数据库

每次提醒更新数据库时若有新数据加入,设置通知提醒,提醒本次新增漏洞X个,本次使用提醒方式为QQ邮箱自动发送邮件。

初次尝试

一般网站都有反爬,我们简单登陆下,看能否返回正确的网页内容。根据测试,需要添加cookie,才可以获取网页内容,并且cookie大约每隔一小时会更新,需要手动替换。

def get_html(url):
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36',
            'cookie': '__jsluid_s=1959ddd5f99a6452630da73422792f8a; Hm_lvt_6b15558d6e6f640af728f65c4a5bf687=1645850145; Hm_lpvt_6b15558d6e6f640af728f65c4a5bf687=1645850176; __jsl_clearance_s=1645856641.619|0|56iA9ACdGE%2FBWB7QuCKGXVNYzi8%3D'
        }
        r = requests.get(url, timeout=30, headers=headers)
        return r.text
    except:
        return " ERROR "

如何自动更新cookie,经过研究,cookie是先由不带cookie,进行requests.get得到一段js代码,再由这段js代码生成的。

def get_html():
    try:
        headers = {
           'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36',
            'cookie': '__jsluid_s=1959ddd5f99a6452630da73422792f8a; __jsl_clearance_s=1645850142.22|0|lVibWI1cle0YcwPpjrn6hB18NKk%3D; Hm_lvt_6b15558d6e6f640af728f65c4a5bf687=1645850145; Hm_lpvt_6b15558d6e6f640af728f65c4a5bf687=1645850146'
        }
        r = requests.get('https://www.seebug.org/', timeout=30)
        return r.text
    except:
        return " ERROR "

print(get_html())

我们将js代码放入控制台,就得到了cookie的主要构造内容。

后面有时间,会针对cookie加密写一篇问题,补上这个尾巴,可以自动获取到cookie,不然每隔一小时手动替换还是很麻烦的。

获取主要网页内容

这个没啥好说的,使用bs4这个库进行解析,获取到表格内容,针对网页元素进行解析即可。

def get_content(url):
    html = get_html(url)
    soup = BeautifulSoup(html, 'lxml')

    liTags = soup.find_all('tr')

    count = 0
    for li in liTags:
        try:
            resource = 'https://www.seebug.org/'  # 获取来源
            cve_id = li.find('i', attrs={'class': 'fa-id-card'})['data-original-title']  # cve-id
            ssv_id = li.find('a').text.strip()  # ssv-id
            submit_time = li.find('td', attrs={'class': 'datetime'}).text.strip()  # 提交时间
            vul_level = li.find('div', attrs={'class': 'vul-level'})['data-original-title']  # 漏洞等级
            vul_title = li.find('a', attrs={'class': 'vul-title'}).text.strip()  # 漏洞名称
            wea_poc = li.find('i', attrs={'class': 'fa-rocket'})['data-original-title']  # 有无poc
            wea_range = li.find('i', attrs={'class': 'fa-bullseye'})['data-original-title']  # 有无靶场
            wea_detail = li.find('i', attrs={'class': 'fa-file-text-o'})['data-original-title']  # 有无详情
            wea_icon = li.find('i', attrs={'class': 'fa-signal'})['data-original-title']  # 有无图表
            wea_exp = '无exp'

            count += 1
            value = (resource, cve_id, ssv_id, submit_time, vul_level, vul_title, wea_poc, wea_range, wea_detail, wea_icon, wea_exp)
            print(value)
        except:
            continue
数据库

创建数据库、创建表、插入数据库、统计数据库表的爬取数量总数

def create_db():
    db = pymysql.connect(host='localhost', user='root', password='123456', port=3306)
    cursor = db.cursor()
    cursor.execute("Create Database If Not Exists test_db Character Set UTF8")
    db.close()

def create_table():
    db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db', autocommit=True)
    cursor = db.cursor()
    sql = 'CREATE TABLE IF NOT EXISTS test_table(resource VARCHAr(255), cve_id VARCHAr(255) NOT NULL, ssv_id VARCHAr(255), submit_time VARCHAr(255),vul_level VARCHAr(255),vul_title VARCHAr(255),wea_poc VARCHAr(255),wea_range VARCHAr(255),wea_detail VARCHAr(255),wea_icon VARCHAr(255),wea_exp VARCHAr(255),PRIMARY KEY (ssv_id))'
    cursor.execute(sql)
    db.close()
    
def count_line():
    db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db',autocommit=True)
    sq = 'select count(*) from test_table'
    ss = pd.read_sql(sq, db)
    line = int((str(ss.values).replace('[','')).replace(']',''))
    return line

def data_insert(value, count):
    db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db')
    cursor = db.cursor()
    sql = "INSERT ignore INTO test_table(resource, cve_id, ssv_id, submit_time, vul_level, vul_title, wea_poc, wea_range, wea_detail, wea_icon,wea_exp) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
    try:
        cursor.execute(sql, value)
        db.commit()
        print('第{}条数据插入完成'.format(count))
    except:
        db.rollback()
        print("第{}条数据插入数据失败".format(count))
    db.close()
邮件通知

这里我使用的是QQ邮箱,需要知道自己的smtp,这个如何获取,打开邮箱,设置里找到smtp,需要发送一个短信,把短信内容填入即可。

def email(data_end,data_begin):
    number = '发送邮箱'
    smtp = ''
    to = '接收邮箱'  # 可以是非QQ的邮箱
    mer = MIMEMultipart()

    head = '''
   

日期:{}

新增漏洞个数:{}

'''.format( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) , data_end-data_begin) mer.attach(MIMEText(head, 'html', 'utf-8')) mer['Subject'] = '新增漏洞信息' # 邮件主题 mer['From'] = number # 发送人 mer['To'] = to # 接收人 s = smtplib.SMTP_SSL('smtp.qq.com', 465) s.login(number, smtp) s.send_message(mer) # 发送邮件 s.quit() print('成功发送')
整体代码
import requests
from bs4 import BeautifulSoup
import pymysql
import pandas as pd
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
import time

def create_db():
    db = pymysql.connect(host='localhost', user='root', password='123456', port=3306)
    cursor = db.cursor()
    cursor.execute("Create Database If Not Exists test_db Character Set UTF8")
    db.close()

def create_table():
    db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db', autocommit=True)
    cursor = db.cursor()
    sql = 'CREATE TABLE IF NOT EXISTS test_table(resource VARCHAr(255), cve_id VARCHAr(255) NOT NULL, ssv_id VARCHAr(255), submit_time VARCHAr(255),vul_level VARCHAr(255),vul_title VARCHAr(255),wea_poc VARCHAr(255),wea_range VARCHAr(255),wea_detail VARCHAr(255),wea_icon VARCHAr(255),wea_exp VARCHAr(255),PRIMARY KEY (ssv_id))'
    cursor.execute(sql)
    db.close()


def count_line():
    db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db',autocommit=True)
    sq = 'select count(*) from test_table'
    ss = pd.read_sql(sq, db)
    line = int((str(ss.values).replace('[','')).replace(']',''))
    return line


def data_insert(value, count):
    db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db')
    cursor = db.cursor()
    sql = "INSERT ignore INTO test_table(resource, cve_id, ssv_id, submit_time, vul_level, vul_title, wea_poc, wea_range, wea_detail, wea_icon,wea_exp) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
    try:
        cursor.execute(sql, value)
        db.commit()
        print('第{}条数据插入完成'.format(count))
    except:
        db.rollback()
        print("第{}条数据插入数据失败".format(count))
    db.close()


def get_html(url):
    try:
        headers = {
            'User-Agent': '浏览器的user-agent',
            'cookie': '网站的cookie'
        }
        r = requests.get(url, timeout=30, headers=headers)
        return r.text
    except:
        return " ERROR "


def get_content(url):
    html = get_html(url)
    soup = BeautifulSoup(html, 'lxml')

    liTags = soup.find_all('tr')

    count = 0
    for li in liTags:
        try:
            resource = 'https://www.seebug.org/'  # 获取来源
            cve_id = li.find('i', attrs={'class': 'fa-id-card'})['data-original-title']  # cve-id
            ssv_id = li.find('a').text.strip()  # ssv-id
            submit_time = li.find('td', attrs={'class': 'datetime'}).text.strip()  # 提交时间
            vul_level = li.find('div', attrs={'class': 'vul-level'})['data-original-title']  # 漏洞等级
            vul_title = li.find('a', attrs={'class': 'vul-title'}).text.strip()  # 漏洞名称
            wea_poc = li.find('i', attrs={'class': 'fa-rocket'})['data-original-title']  # 有无poc
            wea_range = li.find('i', attrs={'class': 'fa-bullseye'})['data-original-title']  # 有无靶场
            wea_detail = li.find('i', attrs={'class': 'fa-file-text-o'})['data-original-title']  # 有无详情
            wea_icon = li.find('i', attrs={'class': 'fa-signal'})['data-original-title']  # 有无图表
            wea_exp = '无exp'

            count += 1
            value = (resource, cve_id, ssv_id, submit_time, vul_level, vul_title, wea_poc, wea_range, wea_detail, wea_icon, wea_exp)
            print(value)
            data_insert(value, count)
        except:
            continue

def email(data_end,data_begin):
    number = '发送邮箱'
    smtp = '需要填入自己的smtp'
    to = '接收邮箱'  # 可以是非QQ的邮箱
    mer = MIMEMultipart()

    head = '''
   

日期:{}

新增漏洞个数:{}

'''.format( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) , data_end-data_begin) mer.attach(MIMEText(head, 'html', 'utf-8')) mer['Subject'] = '新增漏洞信息' # 邮件主题 mer['From'] = number # 发送人 mer['To'] = to # 接收人 # 5.发送邮件 s = smtplib.SMTP_SSL('smtp.qq.com', 465) s.login(number, smtp) s.send_message(mer) # 发送邮件 s.quit() print('成功发送') def main(base_url, deep): url_list = [] count = 0 for i in range(0, deep): url_list.append(base_url + '?page=' + str(i + 1)) for url in url_list: count += 1 print("正在爬取第{}|{}页:{}".format(count, len(url_list), url)) #print(get_html(url)) get_content(url) time.sleep(1) if __name__ == '__main__': create_db() create_table() data_begin = count_line() print("运行之前---》数据库表目前存储条数:{}".format(data_begin)) base_url = 'https://www.seebug.org/vuldb/vulnerabilities' deep = 2 main(base_url, deep) data_end = count_line() print("运行之后---》数据库表目前存储条数:{}".format(data_end)) email(data_end, data_begin)
结果展示


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/743989.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号