栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

【Pytorch分布式训练】在MNIST数据集上训练一个简单CNN网络,将其改成分布式训练

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Pytorch分布式训练】在MNIST数据集上训练一个简单CNN网络,将其改成分布式训练

文章目录

普通单卡训练-GPU普通单卡训练-CPU分布式训练-GPU租GPU服务器相关
以下代码示例基于:在MNIST数据集上训练一个简单CNN网络,将其改成分布式训练。

普通单卡训练-GPU
import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist

class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

def train(gpu, args):
	torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu)
    model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               num_workers=0,
                                               pin_memory=True)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1,
                    args.epochs,
                    i + 1,
                    total_step,
                    loss.item())
                   )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
    parser.add_argument('-g', '--gpus', default=1, type=int,
                        help='number of gpus per node')
    parser.add_argument('-nr', '--nr', default=0, type=int,
                        help='ranking within the nodes')
    parser.add_argument('--epochs', default=2, type=int, metavar='N',
                        help='number of total epochs to run')
    args = parser.parse_args()
    train(0, args)

if __name__ == '__main__':
    main()
普通单卡训练-CPU

以上代码在没有GPU的环境上运行会报AssertionError: Torch not compiled with CUDA enabled的错
在程序开始处加上:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


代码其余地方出现.cuda()的地方改成.to(device)就可以在无gpu的环境中运行了,改完以后就是下面单卡训练——CPU。

CPU单卡训练完整代码如下

# mnist-cpu.py
import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 定义一个简单的CNN模型处理MNIST数据
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

# 训练部分主函数
def train(gpu, args):
	#torch.manual_seed(0)
    model = ConvNet()
    #torch.cuda.set_device(gpu)
    #model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    #criterion = nn.CrossEntropyLoss().cuda(gpu)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               num_workers=0,
                                               pin_memory=True)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        for i, (images, labels) in enumerate(train_loader):
            #images = images.cuda(non_blocking=True)
            #labels = labels.cuda(non_blocking=True)
            images = images.to(device)
            labels = labels.to(device)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1,
                    args.epochs,
                    i + 1,
                    total_step,
                    loss.item())
                   )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))

# 主函数main()接受参数,执行训练
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
    parser.add_argument('-g', '--gpus', default=1, type=int,
                        help='number of gpus per node')
    parser.add_argument('-nr', '--nr', default=0, type=int,
                        help='ranking within the nodes')
    parser.add_argument('--epochs', default=2, type=int, metavar='N',
                        help='number of total epochs to run')
    args = parser.parse_args()
    train(0, args)

# 通过启动主函数来开始训练
if __name__ == '__main__':
    main()

你可能注意到有些参数是多余的,但是对后面的分布式训练是有用的。我们通过执行以下语句就可以在单机单卡上训练:

python mnist-cpu.py -n 1 -g 1 -nr 0

分布式训练-GPU

使用多进程进行分布式训练,我们需要为每个GPU启动一个进程。每个进程需要知道自己运行在哪个GPU上,以及自身在所有进程中的序号。对于多节点,我们需要在每个节点启动脚本。

args.nodes——节点总数
args.gpus——每个节点的GPU总数(每个节点GPU数是一样的)
args.nr ——当前节点在所有节点的序号。
world_size——节点总数乘以每个节点的GPU数可以得到world_size,也即进程总数。
所有的进程需要知道进程0的IP地址以及端口,这样所有进程可以在开始时同步,一般情况下称进程0是master进程,比如我们会在进程0中打印信息或者保存模型。PyTorch提供了mp.spawn来在一个节点启动该节点所有进程,每个进程运行train(i, args),其中i从0到args.gpus - 1。

# mnist-cpu-distributed.py
import os
from datetime import datetime
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 定义一个简单的CNN模型处理MNIST数据
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

# 训练部分主函数
def train(gpu, args):
    rank = args.nr * args.gpus + gpu # 首先计算出当前进程序号:rank = args.nr * args.gpus + gpu

    dist.init_process_group( # 通过dist.init_process_group初始化分布式环境
        backend='nccl', # backend参数指定通信后端,包括mpi, gloo, nccl,这里选择nccl,这是Nvidia提供的官方多卡通信框架,相对比较高效。
        init_method='env://',
        # init_method指的是如何初始化,以完成刚开始的进程同步;这里我们设置的是env://,
        # 指的是环境变量初始化方式,需要在环境变量中配置4个参数:MASTER_PORT,MASTER_ADDR,WORLD_SIZE,RANK,
        # 前面两个参数我们已经配置,后面两个参数也可以通过dist.init_process_group函数中world_size和rank参数配置。
        # 其它的初始化方式还包括共享文件系统以及TCP,比如init_method='tcp://10.1.1.20:23456',其实也是要提供master的IP地址和端口。
        # 注意这个调用是阻塞的,必须等待所有进程来同步,如果任何一个进程出错,就会失败。
        world_size=args.world_size,
        rank=rank
    )

    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu)
    model.cuda(gpu)
    batch_size = 100
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)

    # Wrap the model
    # 对于模型侧,我们只需要用DistributedDataParallel包装一下原来的model即可,在背后它会支持梯度的All-Reduce操作。
    model = nn.parallel.DistributedDataParallel(model,
                                                device_ids=[gpu])

    # Data loading code
    train_dataset = torchvision.datasets.MNIST(
        root='./data',
        train=True,
        transform=transforms.ToTensor(),
        download=True
    )

    # 对于数据侧,我们nn.utils.data.DistributedSampler来给各个进程切分数据,只需要在dataloader中使用这个sampler就好,
    # 值得注意的一点是你要训练循环过程的每个epoch开始时调用train_sampler.set_epoch(epoch),(
    # 主要是为了保证每个epoch的划分是不同的)其它的训练代码都保持不变。
    train_sampler = torch.utils.data.distributed.DistributedSampler(
        train_dataset,
        num_replicas=args.world_size,
        rank=rank
    )

    train_loader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True,
        sampler=train_sampler)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.to(device)
            labels = labels.to(device)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1,
                    args.epochs,
                    i + 1,
                    total_step,
                    loss.item())
                   )
    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))

# 主函数main()接受参数,执行训练
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default=1,
                        type=int, metavar='N')
    parser.add_argument('-g', '--gpus', default=1, type=int,
                        help='number of gpus per node')
    parser.add_argument('-nr', '--nr', default=0, type=int,
                        help='ranking within the nodes')
    parser.add_argument('--epochs', default=2, type=int,
                        metavar='N',
                        help='number of total epochs to run')
    args = parser.parse_args()
    args.world_size = args.gpus * args.nodes
    os.environ['MASTER_ADDR'] = '10.57.23.164' # 服务器ip地址
    os.environ['MASTER_PORT'] = '8888'
    mp.spawn(train, nprocs=args.gpus, args=(args,))

# 通过启动主函数来开始训练
if __name__ == '__main__':
    main()

最后就可以执行代码了,比如我们是4节点,每个节点是8卡,那么需要在4个节点分别执行:

python src/mnist-distributed.py -n 4 -g 8 -nr i

参考链接:Pytorch分布式训练简明教程

租GPU服务器相关

本机没有GPU环境的小伙伴可以租用MistGPU服务器进行训练,教程:【Pytorch分布式训练】MistGPU服务器训练

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/754018.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号