普通单卡训练-GPU普通单卡训练-CPU分布式训练-GPU租GPU服务器相关
以下代码示例基于:在MNIST数据集上训练一个简单CNN网络,将其改成分布式训练。
import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
def train(gpu, args):
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Data loading code
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True)
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
epoch + 1,
args.epochs,
i + 1,
total_step,
loss.item())
)
if gpu == 0:
print("Training complete in: " + str(datetime.now() - start))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
train(0, args)
if __name__ == '__main__':
main()
普通单卡训练-CPU
以上代码在没有GPU的环境上运行会报AssertionError: Torch not compiled with CUDA enabled的错
在程序开始处加上:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
代码其余地方出现.cuda()的地方改成.to(device)就可以在无gpu的环境中运行了,改完以后就是下面单卡训练——CPU。
CPU单卡训练完整代码如下
# mnist-cpu.py
import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 定义一个简单的CNN模型处理MNIST数据
class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
# 训练部分主函数
def train(gpu, args):
#torch.manual_seed(0)
model = ConvNet()
#torch.cuda.set_device(gpu)
#model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
#criterion = nn.CrossEntropyLoss().cuda(gpu)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Data loading code
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=0,
pin_memory=True)
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
#images = images.cuda(non_blocking=True)
#labels = labels.cuda(non_blocking=True)
images = images.to(device)
labels = labels.to(device)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
epoch + 1,
args.epochs,
i + 1,
total_step,
loss.item())
)
if gpu == 0:
print("Training complete in: " + str(datetime.now() - start))
# 主函数main()接受参数,执行训练
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
train(0, args)
# 通过启动主函数来开始训练
if __name__ == '__main__':
main()
你可能注意到有些参数是多余的,但是对后面的分布式训练是有用的。我们通过执行以下语句就可以在单机单卡上训练:
python mnist-cpu.py -n 1 -g 1 -nr 0分布式训练-GPU
使用多进程进行分布式训练,我们需要为每个GPU启动一个进程。每个进程需要知道自己运行在哪个GPU上,以及自身在所有进程中的序号。对于多节点,我们需要在每个节点启动脚本。
args.nodes——节点总数
args.gpus——每个节点的GPU总数(每个节点GPU数是一样的)
args.nr ——当前节点在所有节点的序号。
world_size——节点总数乘以每个节点的GPU数可以得到world_size,也即进程总数。
所有的进程需要知道进程0的IP地址以及端口,这样所有进程可以在开始时同步,一般情况下称进程0是master进程,比如我们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点所有进程,每个进程运行train(i, args),其中i从0到args.gpus - 1。
# mnist-cpu-distributed.py
import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 定义一个简单的CNN模型处理MNIST数据
class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.fc = nn.Linear(7*7*32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
# 训练部分主函数
def train(gpu, args):
rank = args.nr * args.gpus + gpu # 首先计算出当前进程序号:rank = args.nr * args.gpus + gpu
dist.init_process_group( # 通过dist.init_process_group初始化分布式环境
backend='nccl', # backend参数指定通信后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通信框架,相对比较高效。
init_method='env://',
# init_method指的是如何初始化,以完成刚开始的进程同步;这里我们设置的是env://,
# 指的是环境变量初始化方式,需要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,
# 前面两个参数我们已经配置,后面两个参数也可以通过dist.init_process_group函数中world_size和rank参数配置。
# 其它的初始化方式还包括共享文件系统以及TCP,比如init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。
# 注意这个调用是阻塞的,必须等待所有进程来同步,如果任何一个进程出错,就会失败。
world_size=args.world_size,
rank=rank
)
torch.manual_seed(0)
model = ConvNet()
torch.cuda.set_device(gpu)
model.cuda(gpu)
batch_size = 100
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda(gpu)
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
# Wrap the model
# 对于模型侧,我们只需要用DistributedDataParallel包装一下原来的model即可,在背后它会支持梯度的All-Reduce操作。
model = nn.parallel.DistributedDataParallel(model,
device_ids=[gpu])
# Data loading code
train_dataset = torchvision.datasets.MNIST(
root='./data',
train=True,
transform=transforms.ToTensor(),
download=True
)
# 对于数据侧,我们nn.utils.data.DistributedSampler来给各个进程切分数据,只需要在dataloader中使用这个sampler就好,
# 值得注意的一点是你要训练循环过程的每个epoch开始时调用train_sampler.set_epoch(epoch),(
# 主要是为了保证每个epoch的划分是不同的)其它的训练代码都保持不变。
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=args.world_size,
rank=rank
)
train_loader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
for i, (images, labels) in enumerate(train_loader):
images = images.to(device)
labels = labels.to(device)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
epoch + 1,
args.epochs,
i + 1,
total_step,
loss.item())
)
if gpu == 0:
print("Training complete in: " + str(datetime.now() - start))
# 主函数main()接受参数,执行训练
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1,
type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int,
metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
args.world_size = args.gpus * args.nodes
os.environ['MASTER_ADDR'] = '10.57.23.164' # 服务器ip地址
os.environ['MASTER_PORT'] = '8888'
mp.spawn(train, nprocs=args.gpus, args=(args,))
# 通过启动主函数来开始训练
if __name__ == '__main__':
main()
最后就可以执行代码了,比如我们是4节点,每个节点是8卡,那么需要在4个节点分别执行:
python src/mnist-distributed.py -n 4 -g 8 -nr i
参考链接:Pytorch分布式训练简明教程
租GPU服务器相关本机没有GPU环境的小伙伴可以租用MistGPU服务器进行训练,教程:【Pytorch分布式训练】MistGPU服务器训练



