并行训练可以分为数据并行和模型并行。
模型并行
模型并行主要应用于模型相比显存来说更大 一块 device 无法加载的场景 通过把模型切割为几个部分 分别加载到不同的 device 上。比如早期的 AlexNet 当时限于显卡 模型就是分别加载在两块显卡上的。
数据并行
这个是日常会应用的比较多的情况。每一个 device 上会加载一份模型 然后把数据分发到每个 device 并行进行计算 加快训练速度。
如果要再细分 又可以分为单机多卡 多机多卡。这里主要讨论数据并行的单机多卡的情况。
2. Pytorch 并行训练常用的 API 有两个
torch.nn.DataParallel(DP)torch.nn.DistributedDataParallel(DDP)DP 相比 DDP 使用起来更友好 代码少 但是 DDP 支持多机多卡 训练速度更快 而且负载相对要均衡一些。所以优先选用 DDP 吧。
2.1 训练模型的过程在开始怎么调用并行的接口之前 了解并行的过程是有必要的。首先来看下模型训练的过程。
2.2 DP 2.2.1 DP 的计算过程DP 并行的具体过程可以参考下图两幅图。
上图清晰的表达了 torch.nn.DataParallel 的计算过程。
将 inputs 从主 GPU 分发到所有 GPU 上将 model 从主 GPU 分发到所有 GPU 上每个 GPU 分别独立进行前向传播 得到 outputs将每个 GPU 的 outputs 发回主 GPU在主 GPU 上 通过 loss function 计算出 loss 对 loss function 求导 求出损失梯度计算得到的梯度分发到所有 GPU 上反向传播计算参数梯度将所有梯度回传到主 GPU 通过梯度更新模型权重不断重复上面的过程 2.2.2 应用API 如下。
torch.nn.DataParallel(module, device_ids None, output_device None, dim 0)
使用非常简单。一句代码就搞定。
net torch.nn.DataParallel(model, device_ids [0, 1, 2])2.3 DDP 2.3.1 DDP 的过程
大体上的过程和 DP 类似 具体可以参考下图。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ydf7YnuO-1632744811950)(pytorch 并行训练.assets/image-20210927190934348.png)]
与 DataParallel 的单进程控制多 GPU 不同 在 distributed 的帮助下 我们只需要编写一份代码 torch 就会自动将其分配给n个进程 分别在 n 个 GPU 上运行。不再有主 GPU 每个 GPU 执行相同的任务。对每个 GPU 的训练都是在自己的过程中进行的。每个进程都从磁盘加载其自己的数据。分布式数据采样器可确保加载的数据在各个进程之间不重叠。损失函数的前向传播和计算在每个 GPU 上独立执行。因此 不需要收集网络输出。在反向传播期间 梯度下降在所有GPU上均被执行 从而确保每个 GPU 在反向传播结束时最终得到平均梯度的相同副本。
2.3.2 应用开始之前需要先熟悉几个概念。
group
即进程组。默认情况下 只有一个组 一个 job 即为一个组 也即一个 world。
当需要进行更加精细的通信时 可以通过 new_group 接口 使用 world 的子集 创建新组 用于集体通信等。
world size
表示全局进程个数。如果是多机多卡就表示机器数量 如果是单机多卡就表示 GPU 数量。
rank
表示进程序号 用于进程间通讯 表征进程优先级。rank 0 的主机为 master 节点。 如果是多机多卡就表示对应第几台机器 如果是单机多卡 由于一个进程内就只有一个 GPU 所以 rank 也就表示第几块 GPU。
local_rank
表示进程内 GPU 编号 非显式参数 由 torch.distributed.launch 内部指定。例如 多机多卡中 rank 3 local_rank 0 表示第 3 个进程内的第 1 块 GPU。
DDP 的应用流程如下
在使用 distributed 包的任何其他函数之前 需要使用 init_process_group 初始化进程组 同时初始化 distributed 包。如果需要进行小组内集体通信 用 new_group 创建子分组创建分布式并行 DistributedDataParallel 模型 DDP(model, device_ids device_ids)为数据集创建 Sampler使用启动工具 torch.distributed.launch 在每个主机上执行一次脚本 开始训练使用 destory_process_group() 销毁进程组1. 添加参数 --local_rank # 每个进程分配一个 local_rank 参数 表示当前进程在当前主机上的编号。例如 rank 2, local_rank 0 表示第 3 个节点上的第 1 个进程。 # 这个参数是torch.distributed.launch传递过来的 我们设置位置参数来接受 local_rank代表当前程序进程使用的GPU标号 parser argparse.ArgumentParser() parser.add_argument( --local_rank , default -1, type int, help node rank for distributed training ) args parser.parse_args() print(args.local_rank)) 2.初始化使用nccl后端 dist.init_process_group(backend nccl ) # When using a single GPU per process and per # DistributedDataParallel, we need to divide the batch size # ourselves based on the total number of GPUs we have device_ids [1,3] ngpus_per_node len(device_ids) args.batch_size int(args.batch_size / ngpus_per_node) #ps 检查nccl是否可用 #torch.distributed.is_nccl_available () 3.使用DistributedSampler #别忘了设置pin_memory true #使用 DistributedSampler 对数据集进行划分。它能帮助我们将每个 batch 划分成几个 partition 在当前进程中只需要获取和 rank 对应的那个 partition 进行训练 train_dataset MyDataset(train_filelist, train_labellist, args.sentence_max_size, embedding, word2id) train_sampler t.utils.data.distributed.DistributedSampler(train_dataset) train_dataloader DataLoader(train_dataset, pin_memory true, shuffle (train_sampler is None), batch_size args.batch_size, num_workers args.workers, sampler train_sampler ) #DataLoader num_workers这个参数决定了有几个进程来处理data loading。0意味着所有的数据都会被load进主进程 #注意 testset不用sampler 4.分布式训练 #使用 DistributedDataParallel 包装模型 它能帮助我们为不同 GPU 上求得的梯度进行 all reduce 即汇总不同 GPU 计算所得的梯度 并同步计算结果 。 #all reduce 后不同 GPU 中模型的梯度均为 all reduce 之前各 GPU 梯度的均值. 注意find_unused_parameters参数 net textCNN(args,vectors t.FloatTensor(wvmodel.vectors)) if args.cuda: # net.cuda(device_ids[0]) net.cuda() if len(device_ids) 1: net torch.nn.parallel.DistributedDataParallel(net,find_unused_parameters True)



