工作中的接口需要发送2个参数,url和md5,但是一般情况下md5都是将文件下载后再计算出来的。少量的话可以手动下载文件后,通过md5sum命令计算出来,如果有批量的url,手动计算就不太现实,所以考虑通过程序并发实现md5计算。
1、环境win10 + python3.6.5
2、通过multiprocessing模块的回调功能实现 2.1 思路如果手头有linux或者mac环境的话,可以尝试下多线程效果更明显
- 先遍历指定文件(url.txt),拿到每个待下载的url;每遍历一个url,就调用apply_asyn函数将新的请求提交到Pool中,将url传递给下载函数,下载完成后计算md5;将上一步获得的url和md5拼接完毕,交给回调函数输出到文件
进程池的apply_asyn函数是异步非阻塞的,而且支持回调功能,当指定的任务执行完毕后会自动调用回调函数进行处理,无需再不同的进程间同步数据。
所以在上面的需求中,我们只要拿到文件内容的md5之后,直接传递给回调函数写入另外的文件中即可。
# encoding=utf-8
"""
多进程下载文件并计算md5,写入到指定文件
"""
import os
import time
import requests
import hashlib
from multiprocessing import Pool
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_path = os.path.dirname(os.path.realpath(__file__))
def get_md5(content):
"""计算md5"""
hash = hashlib.md5(content)
return hash.hexdigest()
def mycallback(content):
"""将内容追加到指定文件"""
# print(content)
with open(os.path.join(base_path, 'url_md5.txt'), "a+") as fp:
fp.write(content)
def download(url):
"""根据url下载文件,并计算出md5"""
# https不校验证书
res = requests.get(url, verify=False)
# print(res.status_code)
md5 = get_md5(res.content)
content = "%s %sn" % (url.strip(), md5)
return content
if __name__ == "__main__":
start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
print("【%s】开始写入数据" % start)
# 创建一个进程池
pool = Pool()
poolObjects = []
url_file = os.path.join(base_path, 'url.txt')
with open(url_file) as fp:
for url in fp:
url = url.strip()
if not url:
continue
obj = pool.apply_async(download, args=(url,), callback=mycallback)
poolObjects.append(obj)
pool.close()
pool.join()
# 调试用,有问题的话可以看到每个进程的出错信息
for f in poolObjects:
f.get()
end = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
print("【%s】所有数据都写入完成" % end)
url.txt文件内容如下:
https://www.wandoujia.com/apps/596157/download/dot?ch=detail_normal_dl https://www.wandoujia.com/apps/566489/download/dot?ch=detail_normal_dl https://www.wandoujia.com/apps/5919749/download/dot?ch=detail_normal_dl https://www.wandoujia.com/apps/280155/download/dot?ch=detail_normal_dl https://www.wandoujia.com/apps/8301911/download/dot?ch=detail_normal_dl
结果文件:
受制于操作系统的性能,能够开启的进程和线程数都是有限的,而且多进程资源的开销相对来说是最大的,所以各种编程语言对协程这种充分利用单线程资源,支持的也越来越好。因此也尝试下用协程实现下上面的需求。
3.2 代码实现# encoding=utf-8
"""
协程的方式下载文件
"""
import os
import time
import hashlib
import asyncio
import requests
import queue
import threading
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_path = os.path.dirname(os.path.realpath(__file__))
def get_md5(content):
"""计算md5"""
hash = hashlib.md5(content)
return hash.hexdigest()
def write_to_file(q):
"""将队列中的内容写入到文件"""
with open(os.path.join(base_path, "result.txt"), "a+") as fp:
while 1:
# 阻塞等待
content = q.get()
if content == "end":
break
fp.write(content)
async def download(q, url):
"""创建协程函数,根据指定url下载文件,并将url和md5写入到队列"""
print("开始下载")
# 获取事件循环
loop = asyncio.get_event_loop()
# requests模块不支持异步,所以用线程池配合实现
future = loop.run_in_executor(None, requests.get, url)
res = await future
#计算md5
url = url.strip()
md5 = get_md5(res.content)
q.put("%s %sn" % (url, md5))
if __name__ == "__main__":
url_list = [
"https://www.wandoujia.com/apps/8301911/download/dot?ch=detail_normal_dl",
"https://www.wandoujia.com/apps/8301911/download/dot?ch=detail_normal_dl"
]
start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
print("【%s】开始写入数据" % start)
q = queue.Queue()
# 开启写入结果的线程
t = threading.Thread(target=write_to_file, args=(q,))
t.start()
tasks = [download(q, url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# 向队列中输出特殊的结束标志
q.put("end")
t.join()
end = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
print("【%s】所有数据都写入完成" % end)
结果和方式一类似。



