#爬虫用到的一些工具 IP池和UA池 记录下
import random
import requests
from lxml import etree
import MySQLdb
# import telnetlib
import json
class Tool:
def __init__(self):
self.UA = [
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
"Opera/9.80 (Windows NT 6.1; U; zh-cn) Presto/2.9.168 Version/11.50",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 2.0.50727; SLCC2; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; Tablet PC 2.0; .NET4.0E)",
"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; InfoPath.3)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; .NET4.0E) QQBrowser/6.9.11079.201",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36"
]
self.url = 'https://ip.jiangxianli.com/api/proxy_ips' #这里用的是免费代理网站提供的API 也可以进入该网站自行爬取
def generate_user_agent(self):
'''
随机的生成并返回一个user-agent
:return:
'''
self.ua = self.UA[random.randint(0,len(self.UA)-1)]
return self.ua
def get_proxy_ip_html(self):
'''
获得该网站api中提供的数据
:return:
'''
ua = self.generate_user_agent()
self.headers = {
"User-Agent": ua
}
res = requests.get(url=self.url)
res.encoding = res.apparent_encoding
response = json.loads(res.text)['data']['data']
return response
def check_mysql(self):
'''
当提取出代理IP 或代理IP不可用从数据库删除后检查IP个数
:return:
'''
self.cursor.execute('select ip from ip_test')
number = self.cursor.rowcount
if number < 5:
self.save_to_mysql()
def conn_mysql(self):
'''
连接数据库
:return:
'''
self.conn = MySQLdb.Connection(
host='localhost',
user='root',
password='123456',
port=3306,
db='ip'
)
self.cursor = self.conn.cursor()
def close_mysql(self):
'''
关闭数据库
:return:
'''
self.cursor.close()
self.conn.close()
def save_to_mysql(self):
'''
将IP存入数据库中
:return:
'''
ip = self.get_proxy_ip_html()
for i in ip:
if self.test_ip(i["ip"],i["port"],i["protocol"]) and self.before_save(i['ip']):
print(i["ip"])
print("存入数据库")
try:
self.cursor.execute(r'insert into ip_test values(%s,%s,%s)',(i["ip"],i["protocol"],i["port"]))
except Exception as e:
print(e)
self.cursor.rollback()
continue
else:
self.conn.commit()
def test_ip(self,ip,port,http):
'''
测试IP是够可用的方法
:param ip:
:param port:
:param http:
:return:
'''
try:
res = requests.get('http://icanhazip.com/', proxies={'http':'%s://%s:%s'%(http,ip,port)}, timeout=2)
if res.text.strip() == ip:
print("%s该代理IP可用"%ip)
return True
else:
return False
except Exception as e:
return False
def before_save(self,ip):
'''
在存入数据库前判断该IP是否已经存在 去重的方法
:param ip:
:return:
'''
self.cursor.execute('select * from ip_test where ip="%s"'%ip)
if self.cursor.fetchone():
print("数据库中已存在")
return False
return True
def generate_ip_proxy(self):
'''
从数据库中获取并随机生成一个可用的代理IP
:return:
'''
self.cursor.execute('select * from ip_test')
ip_list = self.cursor.fetchall()
while 1:
ip, http, port = ip_list[random.randint(0,len(ip_list)-1)]
print(ip,port,http)
self.check_mysql()
if self.test_ip(ip, port,http):
proxy_ip = {
http: r"%s://%s:%s"%(http,ip, port)
}
print(proxy_ip)
return proxy_ip
else:
self.cursor.execute('delete from ip_test where ip="%s"'%ip)
self.conn.commit()
if __name__ == '__main__':
t = Tool()
t.conn_mysql()
# t.save_to_mysql()
proxies = t.generate_ip_proxy()
res = requests.get('http://httpbin.org/ip',proxies=proxies,timeout=2)
# print(res.status_code)
print(res.text)
t.close_mysql()