- 上海证券交易所爬虫(Python代码实现 金融爬虫实战)
- 分析
- 1、 网址分析
- 2、网页内容分析
- 2.1 解析网页元素
- 代码实现
- 结果
上证e互动 是由上海证券交易所建立、上海证券市场所有参与主体无偿使用的沟通平台,旨在引导和促进上市公司、投资者等各市场参与主体之间的信息沟通,构建集中、便捷的互动渠道。
本文的目标是爬取上证e互动平台上投资者问答任一时段的全部内容。
分析 1、 网址分析上证e互动官网如下:
http://sns.sseinfo.com/qa.do
网页内容如下:
我们需要点击问答模块,然后可填入时间段进行爬取:
选中时段后,还要进行下拉操作,再点击翻页,直到把某时间段内的全部页表爬取完:
2、网页内容分析 2.1 解析网页元素首先定位输入时间段的字框,但这里有个问题,该字框只能以选择的方式进行输入,而不能手动输入,这就不方便我们使用selenium库模拟浏览器输入内容。为了解决这个问题,在网上查了资料,原来是前段源码那里标记了readonly,只要把这个去掉就可以手动输入了。
# 去掉这个才能写入日期(小细节)
js = 'document.getElementById("sdate").removeAttribute("readonly")' # 开始日期
browser.execute_script(js)
js = 'document.getElementById("edate").removeAttribute("readonly")' # 截止日期
browser.execute_script(js)
然后定位问答文本的位置:
为了获取提问用户的地理位置,我们还需先获取到他的个人主页,然后进入他的个人页面进行爬取:
代码实现from selenium import webdriver
from bs4 import BeautifulSoup
import requests
import time
import re
# 根据时间段、翻页爬取
def Due_Time(year, month):
browser = webdriver.Chrome()
url = 'http://sns.sseinfo.com/qa.do'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36'}
browser.get(url)
# browser.maximize_window() # 将模拟浏览器窗口最大化,以显示登录框
# 去掉这个才能写入日期(小细节)
js = 'document.getElementById("sdate").removeAttribute("readonly")'
browser.execute_script(js)
js = 'document.getElementById("edate").removeAttribute("readonly")'
browser.execute_script(js)
# 按时间段检索
if month == 9:
browser.find_element_by_xpath('//*[@id="sdate"]').send_keys(f'{year}-09-01')
browser.find_element_by_xpath('//*[@id="edate"]').send_keys(f'{year}-10-01')
browser.find_element_by_xpath('//*[@id="container"]/div[1]/div[2]/div[1]/div/div[2]/div[2]').click()
time.sleep(7)
if month == 10 or month == 11:
browser.find_element_by_xpath('//*[@id="sdate"]').send_keys(f'{year}-{month}-01')
browser.find_element_by_xpath('//*[@id="edate"]').send_keys(f'{year}-{month+1}-01')
browser.find_element_by_xpath('//*[@id="container"]/div[1]/div[2]/div[1]/div/div[2]/div[2]').click()
time.sleep(7)
if month == 12:
browser.find_element_by_xpath('//*[@id="sdate"]').send_keys(f'{year}-{month}-01')
browser.find_element_by_xpath('//*[@id="edate"]').send_keys(f'{year}-{month}-31')
browser.find_element_by_xpath('//*[@id="container"]/div[1]/div[2]/div[1]/div/div[2]/div[2]').click()
time.sleep(7)
if month < 9 :
browser.find_element_by_xpath('//*[@id="sdate"]').send_keys(f'{year}-0{month}-01')
browser.find_element_by_xpath('//*[@id="edate"]').send_keys(f'{year}-0{month+1}-01')
browser.find_element_by_xpath('//*[@id="container"]/div[1]/div[2]/div[1]/div/div[2]/div[2]').click()
time.sleep(7)
to_csv = open(f'D:/爬虫客户/上证e互动{year}_{month}.csv', 'w', encoding='utf-8', newline="")
# 提问时间抓不到
l = ['股票名称', '股票代码', '提问人名', '提问人ID', '提问人所在地', '提问文本', '答复时间', '答复文本']
tmpstr = ','.join(l) + 'n'
to_csv.write(tmpstr)
page = 0
while 1:
page += 1
print(page)
# 按日记检索才这样翻页
data = browser.page_source
soup = BeautifulSoup(data, 'html.parser')
pages = soup.select('.right.pagination.hidden a')
position = len(pages)
# 没用,结束不了
# if browser.find_element_by_xpath('// *[ @ id = "pagination"] / a[8]') == None:
# break
Total_pagenum = int(soup.select('.right.pagination.hidden a')[-2].text)
if page >= Total_pagenum:
break
browser.find_element_by_xpath(f'// *[ @ id = "pagination"] / a[{position}]').click()
time.sleep(3)
data = browser.page_source
soup = BeautifulSoup(data, 'html.parser')
Items = soup.select('.m_feed_item')
for item in Items:
try:
l = []
item = str(item)
soup = BeautifulSoup(item, 'html.parser')
# 多了个m_qa就查不到,奇怪,但是源码里本来就有m_qa呀,不知道select怎么匹配的
# look = soup.select('.m_feed_detail m_qa')
Info = soup.select('.m_feed_detail .m_feed_face a')
Text = soup.select('.m_feed_detail .m_feed_cnt .m_feed_txt')
Time = soup.select('.m_feed_detail .m_feed_from span')
# 如果有答复
if len(Info) == 2:
Ques_per_name = Info[0]['title']
Ques_per_id = Info[0]['uid']
Ques_per_url = 'http://sns.sseinfo.com/' + Info[0]['href']
location_url = requests.get(Ques_per_url, timeout=5).text
Location = re.findall('(?<=所在地:).*?(?=[<])', location_url)[0]
item_2 = Ques_per_name
item_3 = Ques_per_id
item_4 = Location
Ques_per_text = Text[0].text
Ques_per_text = Ques_per_text.strip()
Ques_per_text = re.sub('<.*?>', '', Ques_per_text)
Ques_per_text = re.sub(',', '', Ques_per_text)
Ques_per_text = re.sub(' ', '', Ques_per_text)
Ques_per_text = re.sub('n', '', Ques_per_text)
item_0 = re.findall('.*?(?=[(])', Ques_per_text)[0][1:]
item_1 = re.findall('(?<=[(]).*?(?=[)])', Ques_per_text)[0]
Ques_per_text = re.sub('.*?[)]', '', Ques_per_text)
item_5 = Ques_per_text
item_6 = f'{year}年' + Time[0].text
Ans_per_name = Info[1]['title']
Ans_per_id = Info[1]['uid']
Ans_per_context = Text[1].text
Ans_per_context = Ans_per_context.strip()
Ans_per_context = re.sub('<.*?>', '', Ans_per_context)
Ans_per_context = re.sub(',', '', Ans_per_context)
Ans_per_context = re.sub(' ', '', Ans_per_context)
Ans_per_context = re.sub('n', '', Ans_per_context)
item_7 = Ans_per_context
print(str(int(item_1))+f' {Ques_per_name} {Ques_per_id} {Ques_per_text}')
print(f'{Ans_per_name} {Ans_per_id} {Ans_per_context}')
l.append(item_0)
l.append(item_1)
l.append(item_2)
l.append(item_3)
l.append(item_4)
l.append(item_5)
l.append(item_6)
l.append(item_7)
tmpstr = ','.join(l) + 'n'
to_csv.write(tmpstr)
# 如果没回复
if len(Info) == 1:
Ques_per_name = Info[0]['title']
Ques_per_id = Info[0]['uid']
Ques_per_url = 'http://sns.sseinfo.com/' + Info[0]['href']
location_url = requests.get(Ques_per_url, timeout=5).text
Location = re.findall('(?<=所在地:).*?(?=[<])', location_url)[0]
item_2 = Ques_per_name
item_3 = Ques_per_id
item_4 = Location
Ques_per_text = Text[0].text
Ques_per_text = Ques_per_text.strip()
Ques_per_text = re.sub('<.*?>', '', Ques_per_text)
Ques_per_text = re.sub(',', '', Ques_per_text)
Ques_per_text = re.sub(' ', '', Ques_per_text)
Ques_per_text = re.sub('n', '', Ques_per_text)
item_0 = re.findall('.*?(?=[(])', Ques_per_text)[0][1:]
item_1 = re.findall('(?<=[(]).*?(?=[)])', Ques_per_text)[0]
Ques_per_text = re.sub('.*?[)]', '', Ques_per_text)
item_5 = Ques_per_text
# 后面再去测试
item_6 = f'{year}年'+Time[0].text
item_7 = '无'
l.append(item_0)
l.append(item_1)
l.append(item_2)
l.append(item_3)
l.append(item_4)
l.append(item_5)
l.append(item_6)
l.append(item_7)
tmpstr = ','.join(l) + 'n'
to_csv.write(tmpstr)
except:
print('文本异常,忽略这条')
continue
to_csv.close()
if __name__ == '__main__':
Due_Time()
结果



