栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python 并发学习

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python 并发学习

主要是想看下读写csv文件时,有什么性能提升的方法。

源文件为一个 100 * 10 列的文件,生成方式如下

data = pd.Dataframe(range(100), columns=['0'])

for i in range(10):
    data[f'{i}'] = data['0']
for i in range(4):
    data.to_csv(f'aa_{i}.csv', index=False)

尝试多线程, 多进程, 单纯for 得到如下结论:

import pandas as pd
import time
import threading
import os


def timer(fun):
    def do_thing(*args, **kwargs):
        start = time.time()
        res = fun(*args, **kwargs)
        print(f'{fun.__name__}', time.time() - start)
        return res

    return do_thing


def process(path, out_path):
    data: pd.Dataframe = pd.read_csv(path)
    for i in range(12):
        data = data.append(data)
    data.to_csv(os.path.join(out_path, os.path.basename(path)), index=False)


@timer
def use_thread(paths, out_path):
    thread_list = []

    for path in paths:
        t = threading.Thread(target=process, args=(path, out_path,))
        thread_list.append(t)

    for t in thread_list:
        t.start()

    for t in thread_list:
        t.join()


@timer
def use_process(paths, out_path):
    from multiprocessing import Pool
    pool = Pool(4)

    for path in paths:
        pool.apply_async(process, (path, out_path,))

    pool.close()
    pool.join()


@timer
def only_for(paths, out_path):
    for path in paths:
        process(path, out_path)
    return


def main():
    start = time.time()
    print('这是主线程:{}'.format(threading.current_thread().name))
    import glob
    paths = glob.glob('uu/*.csv')
    out_path1 = 'uu2'
    out_path2 = 'uu1'

    out_path3 = 'uu3'
    os.makedirs(out_path1, exist_ok=True)
    os.makedirs(out_path2, exist_ok=True)
    os.makedirs(out_path3, exist_ok=True)
    use_thread(paths, out_path1)

    only_for(paths, out_path2)

    use_process(paths, out_path3)
    end = time.time()
    print("总共用时{}秒".format((end - start)))


if __name__ == '__main__':
    main()

执行效果:

这是主线程:MainThread
use_thread 10.597554683685303
only_for 11.264868974685669
use_process 7.812675476074219
总共用时29.679144382476807秒

起码pandas read_csv  to_csv个人感觉没啥区别。多进程还是有用的。感觉可能本身for的时候本身就不是阻塞的,所以使用多线程没有效果。但是多进程真正用到了多核,所以有提升。

并发数都设置为4 因为笔记本4核。

如果修改为写parquet 文件

def process(path, out_path):
    data: pd.Dataframe = pd.read_csv(path)
    for i in range(12):
        data = data.append(data)

    out_path = out_path.replace('csv', 'parquet')
    data.to_parquet(os.path.join(out_path, os.path.basename(path)), index=False)

结果如下:

这是主线程:MainThread
use_thread 0.7137601375579834
only_for 1.2037372589111328
use_process 4.564236640930176
总共用时6.484724998474121秒

多进程反而没有用处,反倒是多线程效果更好,但是写parquet 确实省时间。

process中的迭代次数改到 15 ,最终生成单个文件大小为 (3276800, 10)

时间如下, 比之前迭代次数为12 的csv文件时间还短,行数确是  1/8

use_thread 3.9447433948516846
only_for 8.436389446258545
use_process 6.924658298492432
总共用时19.309784650802612秒

由于parquet 文件本身是列存储,看下列更多时的区别,变成横向merge, 迭代次数改为10

def process(path, out_path):
    data: pd.Dataframe = pd.read_csv(path)
    for i in range(10):
        data = pd.merge(data, data, on='0')

    out_path = out_path.replace('csv', 'parquet')
    data.to_parquet(os.path.join(out_path, os.path.basename(path)), index=False)

 从结果看确实列较多区分更明显,但是和多线程效果近似。

这是主线程:MainThread
use_thread 14.182200193405151
only_for 21.06165313720703
use_process 14.702666521072388
总共用时49.95047187805176秒

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/461018.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号