网站地址:https://www.seebug.org/vuldb/vulnerabilities
首次爬取网站的字段保存到数据库中
对爬虫设置周期启动时间,每12小时爬取一遍最新漏洞信息,基于SSV-ID主键判断是否已在数据库中,如果没有则添加进数据库
每次提醒更新数据库时若有新数据加入,设置通知提醒,提醒本次新增漏洞X个,本次使用提醒方式为QQ邮箱自动发送邮件。
初次尝试一般网站都有反爬,我们简单登陆下,看能否返回正确的网页内容。根据测试,需要添加cookie,才可以获取网页内容,并且cookie大约每隔一小时会更新,需要手动替换。
def get_html(url):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36',
'cookie': '__jsluid_s=1959ddd5f99a6452630da73422792f8a; Hm_lvt_6b15558d6e6f640af728f65c4a5bf687=1645850145; Hm_lpvt_6b15558d6e6f640af728f65c4a5bf687=1645850176; __jsl_clearance_s=1645856641.619|0|56iA9ACdGE%2FBWB7QuCKGXVNYzi8%3D'
}
r = requests.get(url, timeout=30, headers=headers)
return r.text
except:
return " ERROR "
如何自动更新cookie,经过研究,cookie是先由不带cookie,进行requests.get得到一段js代码,再由这段js代码生成的。
def get_html():
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36',
'cookie': '__jsluid_s=1959ddd5f99a6452630da73422792f8a; __jsl_clearance_s=1645850142.22|0|lVibWI1cle0YcwPpjrn6hB18NKk%3D; Hm_lvt_6b15558d6e6f640af728f65c4a5bf687=1645850145; Hm_lpvt_6b15558d6e6f640af728f65c4a5bf687=1645850146'
}
r = requests.get('https://www.seebug.org/', timeout=30)
return r.text
except:
return " ERROR "
print(get_html())
我们将js代码放入控制台,就得到了cookie的主要构造内容。
后面有时间,会针对cookie加密写一篇问题,补上这个尾巴,可以自动获取到cookie,不然每隔一小时手动替换还是很麻烦的。
这个没啥好说的,使用bs4这个库进行解析,获取到表格内容,针对网页元素进行解析即可。
def get_content(url):
html = get_html(url)
soup = BeautifulSoup(html, 'lxml')
liTags = soup.find_all('tr')
count = 0
for li in liTags:
try:
resource = 'https://www.seebug.org/' # 获取来源
cve_id = li.find('i', attrs={'class': 'fa-id-card'})['data-original-title'] # cve-id
ssv_id = li.find('a').text.strip() # ssv-id
submit_time = li.find('td', attrs={'class': 'datetime'}).text.strip() # 提交时间
vul_level = li.find('div', attrs={'class': 'vul-level'})['data-original-title'] # 漏洞等级
vul_title = li.find('a', attrs={'class': 'vul-title'}).text.strip() # 漏洞名称
wea_poc = li.find('i', attrs={'class': 'fa-rocket'})['data-original-title'] # 有无poc
wea_range = li.find('i', attrs={'class': 'fa-bullseye'})['data-original-title'] # 有无靶场
wea_detail = li.find('i', attrs={'class': 'fa-file-text-o'})['data-original-title'] # 有无详情
wea_icon = li.find('i', attrs={'class': 'fa-signal'})['data-original-title'] # 有无图表
wea_exp = '无exp'
count += 1
value = (resource, cve_id, ssv_id, submit_time, vul_level, vul_title, wea_poc, wea_range, wea_detail, wea_icon, wea_exp)
print(value)
except:
continue
数据库
创建数据库、创建表、插入数据库、统计数据库表的爬取数量总数
def create_db():
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306)
cursor = db.cursor()
cursor.execute("Create Database If Not Exists test_db Character Set UTF8")
db.close()
def create_table():
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db', autocommit=True)
cursor = db.cursor()
sql = 'CREATE TABLE IF NOT EXISTS test_table(resource VARCHAr(255), cve_id VARCHAr(255) NOT NULL, ssv_id VARCHAr(255), submit_time VARCHAr(255),vul_level VARCHAr(255),vul_title VARCHAr(255),wea_poc VARCHAr(255),wea_range VARCHAr(255),wea_detail VARCHAr(255),wea_icon VARCHAr(255),wea_exp VARCHAr(255),PRIMARY KEY (ssv_id))'
cursor.execute(sql)
db.close()
def count_line():
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db',autocommit=True)
sq = 'select count(*) from test_table'
ss = pd.read_sql(sq, db)
line = int((str(ss.values).replace('[','')).replace(']',''))
return line
def data_insert(value, count):
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db')
cursor = db.cursor()
sql = "INSERT ignore INTO test_table(resource, cve_id, ssv_id, submit_time, vul_level, vul_title, wea_poc, wea_range, wea_detail, wea_icon,wea_exp) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
try:
cursor.execute(sql, value)
db.commit()
print('第{}条数据插入完成'.format(count))
except:
db.rollback()
print("第{}条数据插入数据失败".format(count))
db.close()
邮件通知
这里我使用的是QQ邮箱,需要知道自己的smtp,这个如何获取,打开邮箱,设置里找到smtp,需要发送一个短信,把短信内容填入即可。
def email(data_end,data_begin):
number = '发送邮箱'
smtp = ''
to = '接收邮箱' # 可以是非QQ的邮箱
mer = MIMEMultipart()
head = '''
日期:{}
新增漏洞个数:{}
'''.format( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) , data_end-data_begin)
mer.attach(MIMEText(head, 'html', 'utf-8'))
mer['Subject'] = '新增漏洞信息' # 邮件主题
mer['From'] = number # 发送人
mer['To'] = to # 接收人
s = smtplib.SMTP_SSL('smtp.qq.com', 465)
s.login(number, smtp)
s.send_message(mer) # 发送邮件
s.quit()
print('成功发送')
整体代码
import requests
from bs4 import BeautifulSoup
import pymysql
import pandas as pd
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
import time
def create_db():
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306)
cursor = db.cursor()
cursor.execute("Create Database If Not Exists test_db Character Set UTF8")
db.close()
def create_table():
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db', autocommit=True)
cursor = db.cursor()
sql = 'CREATE TABLE IF NOT EXISTS test_table(resource VARCHAr(255), cve_id VARCHAr(255) NOT NULL, ssv_id VARCHAr(255), submit_time VARCHAr(255),vul_level VARCHAr(255),vul_title VARCHAr(255),wea_poc VARCHAr(255),wea_range VARCHAr(255),wea_detail VARCHAr(255),wea_icon VARCHAr(255),wea_exp VARCHAr(255),PRIMARY KEY (ssv_id))'
cursor.execute(sql)
db.close()
def count_line():
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db',autocommit=True)
sq = 'select count(*) from test_table'
ss = pd.read_sql(sq, db)
line = int((str(ss.values).replace('[','')).replace(']',''))
return line
def data_insert(value, count):
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='test_db')
cursor = db.cursor()
sql = "INSERT ignore INTO test_table(resource, cve_id, ssv_id, submit_time, vul_level, vul_title, wea_poc, wea_range, wea_detail, wea_icon,wea_exp) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
try:
cursor.execute(sql, value)
db.commit()
print('第{}条数据插入完成'.format(count))
except:
db.rollback()
print("第{}条数据插入数据失败".format(count))
db.close()
def get_html(url):
try:
headers = {
'User-Agent': '浏览器的user-agent',
'cookie': '网站的cookie'
}
r = requests.get(url, timeout=30, headers=headers)
return r.text
except:
return " ERROR "
def get_content(url):
html = get_html(url)
soup = BeautifulSoup(html, 'lxml')
liTags = soup.find_all('tr')
count = 0
for li in liTags:
try:
resource = 'https://www.seebug.org/' # 获取来源
cve_id = li.find('i', attrs={'class': 'fa-id-card'})['data-original-title'] # cve-id
ssv_id = li.find('a').text.strip() # ssv-id
submit_time = li.find('td', attrs={'class': 'datetime'}).text.strip() # 提交时间
vul_level = li.find('div', attrs={'class': 'vul-level'})['data-original-title'] # 漏洞等级
vul_title = li.find('a', attrs={'class': 'vul-title'}).text.strip() # 漏洞名称
wea_poc = li.find('i', attrs={'class': 'fa-rocket'})['data-original-title'] # 有无poc
wea_range = li.find('i', attrs={'class': 'fa-bullseye'})['data-original-title'] # 有无靶场
wea_detail = li.find('i', attrs={'class': 'fa-file-text-o'})['data-original-title'] # 有无详情
wea_icon = li.find('i', attrs={'class': 'fa-signal'})['data-original-title'] # 有无图表
wea_exp = '无exp'
count += 1
value = (resource, cve_id, ssv_id, submit_time, vul_level, vul_title, wea_poc, wea_range, wea_detail, wea_icon, wea_exp)
print(value)
data_insert(value, count)
except:
continue
def email(data_end,data_begin):
number = '发送邮箱'
smtp = '需要填入自己的smtp'
to = '接收邮箱' # 可以是非QQ的邮箱
mer = MIMEMultipart()
head = '''
日期:{}
新增漏洞个数:{}
'''.format( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) , data_end-data_begin)
mer.attach(MIMEText(head, 'html', 'utf-8'))
mer['Subject'] = '新增漏洞信息' # 邮件主题
mer['From'] = number # 发送人
mer['To'] = to # 接收人
# 5.发送邮件
s = smtplib.SMTP_SSL('smtp.qq.com', 465)
s.login(number, smtp)
s.send_message(mer) # 发送邮件
s.quit()
print('成功发送')
def main(base_url, deep):
url_list = []
count = 0
for i in range(0, deep):
url_list.append(base_url + '?page=' + str(i + 1))
for url in url_list:
count += 1
print("正在爬取第{}|{}页:{}".format(count, len(url_list), url))
#print(get_html(url))
get_content(url)
time.sleep(1)
if __name__ == '__main__':
create_db()
create_table()
data_begin = count_line()
print("运行之前---》数据库表目前存储条数:{}".format(data_begin))
base_url = 'https://www.seebug.org/vuldb/vulnerabilities'
deep = 2
main(base_url, deep)
data_end = count_line()
print("运行之后---》数据库表目前存储条数:{}".format(data_end))
email(data_end, data_begin)
结果展示



