Python并发变成有三种方式:多线程Thread、多进程Process、多协程Coroutine。
3、全局解释器锁GIL由于GIL的存在,即使电脑上有多核CPU,单个时刻也只能使用一个,相比于并发加速的C++/Java所以慢。
2、多线程 1、使用多线程,Python爬虫加速10倍1、创建多线程的方式
join所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后。
2、代码
blog_spider.py
# @Time : 2021/10/25 15:13
# @Author : Li Kunlun
# @Description : 对博客程序进行爬虫
import requests
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50 + 1)]
def craw(url):
r = requests.get(url)
print(url, len(r.text))
if __name__ == '__main__':
# https://www.cnblogs.com/#p1 70243
# craw(urls[0])
pass
python01_multi_thread_craw.py
# @Time : 2021/10/25 15:16
# @Author : Li Kunlun
# @Description : 多线程爬虫
import chapter08.spider.blog_spider as blog_spider
import threading
import time
def single_thread():
print("single_thread begin")
for url in blog_spider.urls:
blog_spider.craw(url)
print("single_thread end")
def multi_thread():
print("multi_thread begin")
threads = []
for url in blog_spider.urls:
# 创建线程
threads.append(threading.Thread(target=blog_spider.craw, args=(url,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("multi_thread end")
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
# single thread cost: 8.11093521118164 seconds
print("single thread cost:", end - start, "seconds")
start = time.time()
multi_thread()
end = time.time()
# multi thread cost: 0.4513895511627197 seconds
print("multi thread cost:", end - start, "seconds")
pass
通过控制台进行简单计算:
2、生产者消费者爬虫模式blog_spider.py函数
# @Time : 2021/10/25 15:13
# @Author : Li Kunlun
# @Description : 对博客程序进行爬虫
import requests
from bs4 import BeautifulSoup
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50 + 1)]
# 生产者
def craw(url):
r = requests.get(url)
return r.text
# 消费者
def parse(html):
# 对html进行解析,拿到title内容
soup = BeautifulSoup(html, "html.parser")
# "a"表示 a标签
links = soup.find_all("a", class_="post-item-title")
# 返回一个元组, href和标签
return [(link["href"], link.get_text()) for link in links]
pass
if __name__ == '__main__':
# 解析出该网页中的href和标签
for result in parse(craw(urls[2])):
print(result)
pass
python01_producer_consumer_spider.py
# @Time : 2021/10/25 15:16
# @Author : Li Kunlun
# @Description : 生产者和消费者模块
import chapter08.spider02.blog_spider as blog_spider
import threading
import time
import random
import queue
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
while True:
url = url_queue.get()
html = blog_spider.craw(url)
# 将下载的html结果放到html_queue队列中
html_queue.put(html)
print(threading.current_thread().name, f"craw {url}", f"url_queue.size=", url_queue.qsize())
# 随机休息1-2秒
time.sleep(random.randint(1, 2))
def do_parse(html_queue: queue.Queue, fout):
while True:
html = html_queue.get()
# 获取对象之后进行解析
results = blog_spider.parse(html)
for result in results:
# 文件对象fout
fout.write(str(result) + "n")
print(threading.current_thread().name, f"results.size=", len(results), "html_queue.size=", html_queue.qsize())
# 随机休息1-2秒
time.sleep(random.randint(1, 2))
if __name__ == '__main__':
url_queue = queue.Queue()
html_queue = queue.Queue()
for url in blog_spider.urls:
url_queue.put(url)
for idx in range(3):
t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
t.start()
fout = open("data.txt", "w")
for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
t.start()
总结:对于一个复杂的爬虫程序,可以将其分成很多个模块,每个模块又可以启动不同的线程组进行处理,线程之间通过队列Queue来进行交互。
3、线程安全问题以及解决方案 4、线程池ThreadPoolExecutor有两种创建线程池的方式。
代码:
# @Time : 2021/10/25 20:29
# @Author : Li Kunlun
# @Description : 线程池测试
import concurrent.futures
import chapter08.py02_spider02.blog_spider as blog_spider
# craw
with concurrent.futures.ThreadPoolExecutor() as pool:
htmls = pool.map(blog_spider.craw, blog_spider.urls)
htmls = list(zip(blog_spider.urls, htmls)) # 构成元组
for url, html in htmls:
# 按照顺序执行
# https://www.cnblogs.com/#p1 69918
# https://www.cnblogs.com/#p2 69918
# https://www.cnblogs.com/#p3 69918
# ...
print(url, len(html))
print("craw over")
# parse
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = {} # 字典
for url, html in htmls:
future = pool.submit(blog_spider.parse, html)
futures[future] = url
# 方式1
# for future, url in futures.items():
# # 按照顺序执行
# print(url, future.result())
# 方式2
for future in concurrent.futures.as_completed(futures):
url = futures[future]
# 不按顺序执行
print(url, future.result())
5、使用多线程在Web服务器中实现加速
python02_web.py:
# @Time : 2021/10/25 20:52
# @Author : Li Kunlun
# @Description : 使用线程池在web服务中实现加速
import flask
import json
import time
app = flask.Flask(__name__)
def read_file():
time.sleep(0.1)
return "read_file result"
def read_db():
time.sleep(0.2)
return "read_db result"
def read_api():
time.sleep(0.3)
return "read_api result"
@app.route("/")
def index():
result_file = read_file()
result_db = read_db()
result_api = read_api()
return json.dumps({
"result_file": result_file,
"result_db": result_db,
"result_api": result_api
})
if __name__ == '__main__':
app.run()
启动后,控制台输出:
浏览器访问该网址:
postman测试查看网络调用消耗的时间:
对代码进行下面的改造(添加线程池):
# @Time : 2021/10/25 20:52
# @Author : Li Kunlun
# @Description : 使用线程池在web服务中实现加速
import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor
app = flask.Flask(__name__)
# 初始化全局线程池
pool = ThreadPoolExecutor()
def read_file():
time.sleep(0.1) # 相当于io操作
return "read_file result"
def read_db():
time.sleep(0.2)
return "read_db result"
def read_api():
time.sleep(0.3)
return "read_api result"
@app.route("/")
def index():
# 返回的不再是字符串,而是三个future对象
result_file = pool.submit(read_file)
result_db = pool.submit(read_db)
result_api = pool.submit(read_api)
return json.dumps({
"result_file": result_file.result(),
"result_db": result_db.result(),
"result_api": result_api.result()
})
if __name__ == '__main__':
app.run()
此时,程序整个运行时间以最长的io操作为准,实现了加速:
相当于3个io操作并发运行,以最长的时间为准。
参考网址:
https://blog.csdn.net/han2529386161/article/details/103592862 https://blog.csdn.net/daijiguo/article/details/78042309 https://blog.csdn.net/qq_35869630/article/details/105876923?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522163516814816780271565862%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=163516814816780271565862&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-5-105876923.pc_search_result_control_group&utm_term=python%E5%A4%9A%E7%BA%BF%E7%A8%8B%E9%81%87%E5%88%B0io%E6%97%B6%E9%87%8A%E6%94%BEgil+%E4%BC%9A%E5%8A%A0%E5%BF%AB%E6%89%A7%E8%A1%8C%E9%80%9F%E5%BA%A6&spm=1018.2226.3001.41873、多进程 1、多进程模块加速程序运行
python01_thread_process_cpu_bound.py:
# @Time : 2021/10/25 21:29
# @Author : Li Kunlun
# @Description : 单线程、多线程、多进程对cpu密集计算速度比较
import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
prime = [112272535095293] * 100
# 判断是否为素数(cpu计算,没有涉及io)
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
# 单线程
def single_thread():
for number in prime:
is_prime(number)
# 多线程
def multi_thread():
with ThreadPoolExecutor() as pool:
pool.map(is_prime, prime)
# 多进程
def multi_process():
with ProcessPoolExecutor() as pool:
pool.map(is_prime, prime)
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print(f"single_thread cost:", end - start, "seconds")
start = time.time()
multi_thread()
end = time.time()
print(f"multi_thread cost:", end - start, "seconds")
start = time.time()
multi_process()
end = time.time()
print(f"multi_process cost:", end - start, "seconds")
"""
程序执行结果:
single_thread cost: 42.00479984283447 seconds
multi_thread cost: 42.01970911026001 seconds
multi_process cost: 9.357584953308105 seconds
"""
2、在Flask服务中使用多进程池加速程序运行
# @Time : 2021/10/25 20:52
# @Author : Li Kunlun
# @Description : 使用进程池在web服务中实现加速
import flask
import json
import time
from concurrent.futures import ProcessPoolExecutor
import math
import json
app = flask.Flask(__name__)
# 判断是否为素数(cpu计算,没有涉及io)
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
# numbers表示一个参数
@app.route("/is_prime/")
def api_is_prime(numbers):
number_list = [int(x) for x in numbers.split(",")]
results = process_pool.map(is_prime, number_list)
return json.dumps(dict(zip(number_list, results)))
if __name__ == '__main__':
# 初始化全局进程池-放到这个位置才能够正常使用
process_pool = ProcessPoolExecutor()
app.run()
程序执行结果:
4、莫烦Python 1、多线程 1、添加线程# @Time : 2021/10/26 10:39
# @Author : Li Kunlun
# @Description : 添加线程
import threading
def show():
# 获取已激活的线程数,输出结果1
print(threading.active_count())
# 查看所有线程信息,输出的结果是一个<_MainThread(...)>带多个
# [<_MainThread(MainThread, started 59304)>]
# <_MainThread(MainThread, started 59304)>
print(threading.enumerate()) # see the thread list
print(threading.current_thread())
def thread_job():
print('This is a thread of %s' % threading.current_thread())
def main():
# 添加线程,threading.Thread()接收参数target代表这个线程要完成的任务,需自行定义
thread = threading.Thread(target=thread_job, )
thread.start()
show()
if __name__ == '__main__':
main()
2、join功能
不添加join()的结果:
# @Time : 2021/10/26 10:50
# @Author : Li Kunlun
# @Description : 没有添加join的结果
import threading
import time
def thread_job():
print("T1 startn")
for i in range(10):
time.sleep(0.1) # 任务间隔0.1s
print("T1 finishn")
added_thread = threading.Thread(target=thread_job, name='T1')
added_thread.start()
print("all donen")
"""
预期输出结果:
T1 start
T1 finish
all done
实际结果:
T1 start
all done
T1 finish
"""
线程任务还未完成便输出all done。如果要遵循顺序,可以在启动线程后对它调用join。
使用join()函数,按照顺序正常输出。
# @Time : 2021/10/26 10:48
# @Author : Li Kunlun
# @Description :join功能测试
# https://blog.csdn.net/nanhuaibeian/article/details/100160953
import threading
import time
def T1_job():
print('T1 startn')
for i in range(10):
time.sleep(0.1)
print('T1 finishn')
# 任务量相对于T1来说较小
def T2_job():
print('T2 startn')
print('T2 finishn')
def main():
added_thread = threading.Thread(target=thread_job, name='T1')
thread2 = threading.Thread(target=T2_job, name='T2')
# 推荐以这种方式进行排布
added_thread.start()
thread2.start()
thread2.join() # 等待thread2运行完成之后才会运行下面的语句
added_thread.join()
print('all donen')
if __name__ == '__main__':
main()
"""
可以接收的输出结果(按照顺序进行输出):
T1 start
T2 start
T2 finish
T1 finish
all done
"""
3、队列
# @Time : 2021/10/26 11:02
# @Author : Li Kunlun
# @Description :
import threading
import time
from queue import Queue
# 函数的参数是一个列表l和一个队列q,函数的功能是,
# 对列表的每个元素进行平方计算,将结果保存在队列中
def job(l, q):
for i in range(len(l)):
l[i] = l[i] ** 2
q.put(l)
def multithreading():
# 在多线程函数中定义一个Queue,用来保存返回值,代替return,
# 定义一个多线程列表,初始化一个多维数据列表。
q = Queue() # q中存放返回值,代替return的返回值
threads = []
data = [[1, 2, 3], [3, 4, 5], [4, 4, 4], [5, 5, 5]]
# 在多线程函数中定义四个线程,启动线程,将每个线程添加到多线程的列表中
for i in range(4):
t = threading.Thread(target=job, args=(data[i], q))
t.start()
threads.append(t) # todo 把每个线程append到线程列表中
# 分别join四个线程到主线程
for thread in threads:
thread.join()
# 定义一个空的列表results,将四个线程运行后保存在队列中的结果返回给空列表results
results = []
for _ in range(4):
results.append(q.get())
print(results)
if __name__ == '__main__':
multithreading()
2、多进程
1、queue
# @Time : 2021/10/26 13:55
# @Author : Li Kunlun
# @Description : 存储进程输出
import multiprocessing as mp
"""
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后
再从队列中取出结果, 继续加载运算。
因为:多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果
"""
# 该函数没有返回值,将结果存放到队列中
def job(q):
res = 0
for i in range(1000):
res += i + i ** 2 + i ** 3
q.put(res) # queue
if __name__ == '__main__':
# 定义一个多进程队列,用以存储结果
q = mp.Queue()
# 定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,
# 参数后面需要加一个逗号,表示args是可迭代的,不加逗号会报错
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
# 上面是分两批处理的,所以这里分两批输出,将结果分别保存
res1 = q.get()
res2 = q.get()
# 计算最终的结果
print(res1 + res2)
2、pool
# @Time : 2021/10/26 14:07
# @Author : Li Kunlun
# @Description : 进程池
import multiprocessing as mp
def job(x):
return x * x
def multicore():
# Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
pool = mp.Pool(processes=2)
# map() 放入迭代参数,返回多个结果
res = pool.map(job, range(10))
print(res)
# apply_async()只能放入一组参数,并返回一个结果
res = pool.apply_async(job, (2,))
print(res.get())
# 如果想得到map()的效果apply_async需要通过迭代
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
print([res.get() for res in multi_res])
if __name__ == '__main__':
multicore()
3、lock
# @Time : 2021/10/26 14:15
# @Author : Li Kunlun
# @Description : 进程锁
import multiprocessing as mp
import time
def job(v, num, l):
l.acquire()
for _ in range(4):
time.sleep(0.1)
# 两个进程对共享变量进行操作,没有加锁的会有冲突
v.value += num # 获取共享变量值
print(v.value)
l.release()
def multicore():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享变量
p1 = mp.Process(target=job, args=(v, 1, l))
p2 = mp.Process(target=job, args=(v, 3, l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
"""
控制台输出:
1
2
3
4
7
10
13
16
"""



