栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

如何加速计算?

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何加速计算?

当模型很大时,如何加速计算(一) python常见并行库 DASK

dask是python的一个并行计算库,可以动态调度资源提供并行计算,并行化的数据集成提供接口给numpy,pandas或者python迭代器,Task Graph 任务图非常清晰,使得开发人员和用户都可以自由地构建复杂的算法,并处理大多数数据工程框架中常见的map/filter/groupby范式难以处理的混乱情况。
图片地址:官方手册

dask示例

我们测试一下在numpy和dask中数据计算的时间。

import time
import numpy as np
import matplotlib.pyplot as plt
import dask
import dask.array as da
data = np.random.normal(loc =10.0 , scale= 0.1,size = (20000,20000))

生成一波固定期望方差的数组data,使用numpy在数据上按某个维度求该维度向量上的均值。

def numpy_compute():
    print(data.mean(axis=0))

同时使用dask对数组进行分块,这里我们每块为1000*1000。

data_p=da.from_array(data,chunks=(1000,1000))

计算dask数组中按某个维度求向量均值。

def dask_compute():
    print(data_p.mean(axis=0).compute()) 

比较二者时间:

def run_time(fun):

    start_time = time.time()
    fun()
    end_time = time.time()

    print("程序运行时间为:{} 秒".format(str(round((end_time - start_time), 1))))
    return end_time - start_time
@run_time
def task():
    numpy_compute()
@run_time
def task():
    dask_compute()

课件dask相比之下运行时间要快一些。

dask的功能远不知与此,还有许多可以并行化的操作。

multiprocessing

multiprocessing是python编写多进程的程序,支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

# 参数
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
- group:分组,实际上很少使用
- target:表示调用对象,你可以传入方法的名字
- name:别名,相当于给这个进程取一个名字
- args:表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可
- kwargs:表示调用对象的字典
简单使用
import math
import datetime
import multiprocessing as mp
def train_on_parameter(name, param):
    result = 0
    for num in param:
        result += math.sqrt(num * math.tanh(num) / math.log2(num) / math.log10(num))
    return {name: result}
if __name__ == '__main__':
    start_t = datetime.datetime.now()
    num_cores = int(mp.cpu_count())
    print("本地计算机有: " + str(num_cores) + " 核心")
    pool = mp.Pool(num_cores)
    param_dict = {'task1': list(range(10, 30000000)),
                  'task2': list(range(30000000, 60000000)),
                  'task3': list(range(60000000, 90000000)),
                  'task4': list(range(90000000, 120000000)),
                  'task5': list(range(120000000, 150000000)),
                  'task6': list(range(150000000, 180000000)),
                  'task7': list(range(180000000, 210000000)),
                  'task8': list(range(210000000, 240000000))}
    results = [pool.apply_async(train_on_parameter, args=(name, param)) for name, param in param_dict.items()]
    results = [p.get() for p in results]
    end_t = datetime.datetime.now()
    elapsed_sec = (end_t - start_t).total_seconds()
    print("多进程计算 共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")

运行结果:

线程池
import random
from multiprocessing.pool import Pool
from time import sleep, time
import os
def run(name):
    print("%s子进程开始,进程ID:%d" % (name, os.getpid()))
    start = time()
    sleep(random.choice([1, 2, 3, 4]))
    end = time()
    print("%s子进程结束,进程ID:%d。耗时0.2%f" % (name, os.getpid(), end-start))
if __name__ == "__main__":
    print("父进程开始")
    # 创建多个进程,表示可以同时执行的进程数量。默认大小是CPU的核心数
    p = Pool(8)
    for i in range(10):
        # 创建进程,放入进程池统一管理
        p.apply_async(run, args=(i,))
    # 如果我们用的是进程池,在调用join()之前必须要先close(),并且在close()之后不能再继续往进程池添加新的进程
    p.close()
    # 进程池对象调用join,会等待进程吃中所有的子进程结束完毕再去结束父进程
    p.join()
    print("父进程结束。")
threading

Thread 对象数据属性有name(线程名),ident(线程标识符),daemon(线程是否是守护线程)等。主要对象包括start(),run()和join()等。start表示开始执行该线程,run()定义线程功能,通常在子类中被应用开发者重写,join (timeout=None)表示直到启动的线程终止之前一直挂起,除非给出timeout秒,否则会一直阻塞。

import threading
import time
def read():
    for x in range(5):
        print('在%s,正在听音乐' % time.ctime())
        time.sleep(1.5)
def write():
    for x in range(5):
        print('在%s,正在看电视' % time.ctime())
        time.sleep(1.5)

def main():
    music_threads = []  # 用来存放执行read函数线程的列表
    TV_threads = []  # 用来存放执行write函数线程的列表
    for i in range(1,2):  # 创建1个线程用于read(),并添加到read_threads列表
        t = threading.Thread(target=read) # 执行的函数如果需要传递参数,threading.Thread(target=函数名,args=(参数,逗号隔开))
        music_threads.append(t)
    for i in range(1,2): # 创建1个线程执行write(),并添加到write_threads列表
        t = threading.Thread(target=write) # 执行的函数如果需要传递参数,threading.Thread(target=函数名,args=(参数,逗号隔开))
        TV_threads.append(t)
    for i in range(0,1):  # 启动存放在read_threads和write_threads列表中的线程
        music_threads[i].start()
        TV_threads[i].start()
if __name__ == '__main__':
    main()

本例参考博客

为了让线程更好的封装,可以用threading模块下的Thread类,继承这个类,然后实现run方法,线程就回自动运行run方法中的代码。

import threading
import time
count = 0
class MyThread(threading.Thread):
    def __init__(self , threadName):
        super(MyThread,self).__init__(name=threadName)
    """一旦这个MyThread类被调用,自动的就会运行底下的run方法中的代码,
    因为这个run方法所属的的MyThread类继承了threading.Thread"""
    def run(self):
        global count
        for i in range(100):
            count += 1
            time.sleep(0.3)
            print(self.getName() , count)
for i in range(2):
    MyThread("MyThreadName:" + str(i)).start()
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/461128.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号