最近实习的领导给了我一个任务,需要先下载发行的新股的新股发行公告和配售结果公告,然后再进行处理。因此为了简化流程我把这个获取文件并重命名的过程写了个程序。
如果你需要爬取一个网站,request等常见爬虫方式早就被严防死守,而你不想处理复杂的反爬虫情况,那么你需要什么框架呢?
没错!就是Selenium:一个“按键精灵”一样的库,可以模拟人工操作来完成一系列步骤而不被网页发现。
(而且越蠢的方式就是越难发现的,毕竟如果你的爬虫并不会对网页造成很大的危害,那么他也没必要冒着不小心干掉正常用户的风险来处理你)
我会先给出爬虫的核心部分,然后在最后给出整体的代码~(这份代码我已经测试过不少次了,基本解决了因为网速导致的爬取失败的问题)
爬虫的核心部分:
首先是get_document(stock_pool, driver),他的作用是对股票进行一个简单的筛选,然后丢入对应的函数再进行处理。
def get_document(stock_pool, driver):
sz_web = 'http://www.szse.cn/disclosure/listed/notice/'
sh_web = 'http://www.sse.com.cn/assortment/stock/list/info/announcement/index.shtml?productId='
# 整理股票顺序
stock_pool.sort()
print(stock_pool)
for i in stock_pool:
stock = i[:6]
if '6' != str(i)[0]:
sz(website=sz_web, stock=stock, driver=driver)
else:
sh(website=sh_web, stock=stock, driver=driver)
sleep(10)
get_document(input('请输入股票代码:').split(','), driver)
然后是sz和sh函数,他们的主要作用是进入存放对应文档的页面。
def sh(website, stock, driver, ud_list):
log(f'沪市股票:{stock}')
# 搜索文件
driver.get(website + stock)
driver.implicitly_wait(10)
sleep(5)
log('开始搜索文件')
# 下载文件
sh_download_file(driver, stock)
def sz(website, stock, driver, ud_list):
log(f'深市股票:{stock}')
# 搜索文件
driver.get(website)
driver.implicitly_wait(10)
input_code = driver.find_element_by_id('input_code')
input_code.send_keys(stock)
sleep(5)
try:
log('开始搜索文件')
result = driver.find_element_by_xpath('//*[@id="c-typeahead-menu-1"]/li/a')
result.click()
except:
log('无法找到对应股票!')
pass
sleep(5)
# 下载文件
sz_download_file(driver, stock)
最后是一些功能函数,用于定位文档所在位置并点击下载:
def sh_loc_file(driver):
flag = 0
while flag == 0:
try:
context = driver.find_elements_by_class_name('modal_pdf_list')
flag = 1
except:
sleep(3)
log('未定位到元素!等待3秒后重试~')
pass
return context
def sh_download_file(driver, stock):
flag = 0
context = sh_loc_file(driver)
for i in range(len(context)):
if '配售结果' in context[i].text or '发行公告' in context[i].text:
# 文件下载
context[i].find_elements_by_class_name('pdf-first')[0].click()
download_icon = driver.find_element_by_xpath('/html/body/div[14]/div/div/div[2]/div/div[1]/div[2]/a[1]')
close_icon = driver.find_element_by_xpath('/html/body/div[14]/div/div/div[1]/button/span')
download_icon.click()
close_icon.click()
sleep(3)
flag = 1
# 文件命名
text = context[i].text.split(' ')
code = text[0]
name = text[1]
title_box = text[2]
title = '配售结果公告' if '配售结果' in title_box else '发行结果公告'
file_name = '【备查】' + code + '_' + name + '_' + title + '.pdf'
try:
file_rename(file_name)
except:
sleep(10)
file_rename(file_name)
pass
# 记录已爬取文件
done_list.append(file_name)
if flag != 1:
log(f'{stock}未找到对应文件!')
ud_list.append(stock)
def sz_loc_file(driver):
flag = 0
while flag == 0:
try:
codes = driver.find_elements_by_class_name('title-code')
names = driver.find_elements_by_class_name('title-name')
title_boxes = driver.find_elements_by_class_name('text-title-box')
download_icons = driver.find_elements_by_class_name('titledownload-icon')
flag = 1
except:
sleep(3)
pass
return codes, names, title_boxes, download_icons
def sz_download_file(driver, stock):
flag = 0
codes, names, title_boxes, download_icons = sz_loc_file(driver)
for i in range(len(title_boxes)):
title_box = title_boxes[i].text
if '配售结果' in title_box or '发行公告' in title_box:
# 文件下载
download_icons[i].click()
sleep(10)
flag = 1
# 文件命名
code = codes[i].text
name = names[i].text
title = '配售结果公告' if '配售结果' in title_box else '发行结果公告'
file_name = '【备查】' + code + '_' + name + '_' + title + '.pdf'
try:
file_rename(file_name)
except:
sleep(10)
file_rename(file_name)
pass
# 记录已爬取文件
done_list.append(file_name)
if flag != 1:
log(f'{stock}未找到对应文件!')
ud_list.append(stock)
需要注意的坑:
driver需要设置一下默认下载地址和下载方式,因为有的时候chrome会直接在网页里打开pdf而不是下载。
def set_driver():
download_dir = set_work_dir()
options = Options()
options.page_load_strategy = 'normal'
options.add_experimental_option('prefs', {
"download.default_directory": download_dir, # 更改默认下载地址
"download.prompt_for_download": False, # 自动下载文件
"download.directory_upgrade": True,
"plugins.always_open_pdf_externally": True # 不直接在chrome内显示pdf
})
driver = webdriver.Chrome(options=options)
return driver
driver = set_driver()
代码全文:
from selenium import webdriver
from time import sleep
import pandas as pd
import datetime
import os
import time
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
def log(text):
print('-' * 15)
print(text)
def set_work_dir():
current_dir = os.getcwd()
date = str(datetime.date.today())
work_dir = os.path.join(current_dir, date)
if not os.path.isdir(work_dir):
os.mkdir(work_dir)
log('创建文件夹“%s”' % work_dir)
else:
log('文件夹“%s”已存在' % work_dir)
return work_dir
def set_download_dir():
download_dir = set_work_dir()
return download_dir
def set_driver():
download_dir = set_work_dir()
options = Options()
options.page_load_strategy = 'normal'
options.add_experimental_option('prefs', {
"download.default_directory": download_dir, # 更改默认下载地址
"download.prompt_for_download": False, # 自动下载文件
"download.directory_upgrade": True,
"plugins.always_open_pdf_externally": True # 不直接在chrome内显示pdf
})
driver = webdriver.Chrome(options=options)
return driver
def file_rename(file_name):
# 打印文件修改信息
flag = 0
while flag == 0:
try:
file_list = os.listdir(download_dir)
file_list.sort(key=lambda fn: os.path.getmtime(download_dir + "\" + fn))
target_file = file_list[-1]
old = os.path.join(download_dir, target_file)
new = os.path.join(download_dir, file_name)
assert target_file[-3:].lower() == 'pdf'
flag = 1
if not os.path.exists(new):
log('找到目标文件,开始改名')
print('From:' + old)
print('To:' + new)
os.renames(old, new)
else:
log('文件已存在:' + new)
except:
print('错误,等待三秒后重试:可能由于【文件未下载完成】或【文件已存在】导致')
sleep(3)
def sh_loc_file(driver):
flag = 0
while flag == 0:
try:
context = driver.find_elements_by_class_name('modal_pdf_list')
flag = 1
except:
log('未定位到元素:可能由于网页元素未加载导致')
sleep(3)
pass
return context
def sh_download_file(driver, stock):
flag = 0
context = sh_loc_file(driver)
for i in range(len(context)):
text = context[i].text
if '配售结果' in text or '发行公告' in text and ('延迟' and '推迟') not in text:
# 文件下载
context[i].find_elements_by_class_name('pdf-first')[0].click()
download_icon = driver.find_element_by_xpath('/html/body/div[14]/div/div/div[2]/div/div[1]/div[2]/a[1]')
close_icon = driver.find_element_by_xpath('/html/body/div[14]/div/div/div[1]/button/span')
download_icon.click()
close_icon.click()
log(f'下载文件:{context[i].text}')
sleep(5)
flag = 1
# 文件命名
text = context[i].text.split(' ')
code = text[0]
name = text[1]
title_box = text[2]
title = '配售结果公告' if '配售结果' in title_box else '发行结果公告'
file_name = '【备查】' + code + '_' + name + '_' + title + '.pdf'
# 文件改名
file_rename(file_name)
# 记录已爬取文件
done_list.append(file_name)
if flag != 1:
log(f'{stock}未找到对应文件!')
ud_list.append(stock)
def sz_loc_file(driver):
flag = 0
while flag == 0:
try:
codes = driver.find_elements_by_class_name('title-code')
names = driver.find_elements_by_class_name('title-name')
title_boxes = driver.find_elements_by_class_name('text-title-box')
download_icons = driver.find_elements_by_class_name('titledownload-icon')
flag = 1
except:
log('未定位到元素:可能由于网页元素未加载导致')
sleep(3)
pass
return codes, names, title_boxes, download_icons
def sz_download_file(driver, stock):
flag = 0
codes, names, title_boxes, download_icons = sz_loc_file(driver)
for i in range(len(title_boxes)):
title_box = title_boxes[i].text
if '配售结果' in title_box or '发行公告' in title_box:
# 文件下载
download_icons[i].click()
sleep(5)
flag = 1
# 文件命名
code = codes[i].text
name = names[i].text
title = '配售结果公告' if '配售结果' in title_box else '发行结果公告'
file_name = '【备查】' + code + '_' + name + '_' + title + '.pdf'
file_rename(file_name)
# 记录已爬取文件
done_list.append(file_name)
if flag != 1:
log(f'{stock}未找到对应文件!')
ud_list.append(stock)
def sh(website, stock, driver):
log(f'沪市股票:{stock}')
# 搜索文件
driver.get(website + stock)
driver.implicitly_wait(10)
sleep(5)
log('开始搜索文件')
# 下载文件
sh_download_file(driver, stock)
def sz(website, stock, driver):
log(f'深市股票:{stock}')
# 搜索文件
driver.get(website)
driver.implicitly_wait(10)
input_code = driver.find_element_by_id('input_code')
input_code.send_keys(stock)
sleep(5)
try:
log('开始搜索文件')
result = driver.find_element_by_xpath('//*[@id="c-typeahead-menu-1"]/li/a')
result.click()
except:
log('无法找到对应股票!')
pass
sleep(5)
# 下载文件
sz_download_file(driver, stock)
def get_document(stock_pool, driver):
sz_web = 'http://www.szse.cn/disclosure/listed/notice/'
sh_web = 'http://www.sse.com.cn/assortment/stock/list/info/announcement/index.shtml?productId='
# 整理股票顺序
stock_pool.sort()
print(stock_pool)
for i in stock_pool:
stock = i[:6]
if '6' != str(i)[0]:
sz(website=sz_web, stock=stock, driver=driver)
else:
sh(website=sh_web, stock=stock, driver=driver)
sleep(10)
if __name__ == '__main__':
# 设置driver基本信息
driver = set_driver()
driver.maximize_window()
# 设置工作文件夹
download_dir = set_download_dir()
# 设置已完成、未完成列表
ud_list = []
done_list = []
# 获取文件
get_document(input('请输入股票代码:').split(','), driver)
log('已获取文件:')
for file in done_list:
print(file)
if ud_list:
log('以下股票文件未获取!')
print(ud_list)
get_document(ud_list, driver)
driver.quit()
2021-10-20更新
修改了等待文件下载的逻辑,从单纯的等待时间变为检测文件格式,提升了易理解性和鲁棒性。
def file_rename(file_name):
# 打印文件修改信息
flag = 0
while flag == 0:
try:
# 获取下载目标目录中最新的文件
file_list = os.listdir(download_dir)
file_list.sort(key=lambda fn: os.path.getmtime(download_dir + "\" + fn))
target_file = file_list[-1]
old = os.path.join(download_dir, target_file)
new = os.path.join(download_dir, file_name)
assert target_file[-3:].lower() == 'pdf'
flag = 1
if not os.path.exists(new):
log('找到目标文件,开始改名')
print('From:' + old)
print('To:' + new)
os.renames(old, new)
else:
log('文件已存在:' + new)
except:
print('错误,等待三秒后重试:可能由于【文件未下载完成】或【文件已存在】导致')
sleep(3)
运行结果展示:
“互联网精神”即:开放、平等、协作、快速、分享
对更多内容感兴趣欢迎关注我的个人公众号:梧承 Book House



