SINGLE-MACHINE MODEL PARALLEL BEST PRACTICES

本文译自PYTORCH并行处理:Author: Shen Li

模型并行在分布式训练中很常用。pytorch本身就用 DataParallel 做并行训练,使用非常简单。思想也比较直观:将模型复制到多个GPU上,然后每个gpu计算输入的一部分。尽管这个方法可以加速训练,但是当模型太大以至于放不下一个单独gpu时就不太好用了。本文利用另一种方法:model parallel(并非像DataParallel是一个函数,而是一种操作)。相比于DataParallel,该方法将一个模型分解后放到不同GPU上,而不是复制模型到不同GPU上。(栗子:一个总共10层的模型,当利用DataParallel,每个GPU将会有这个模型的全部10层,而利用model parallel在两个GPU上时,每个GPU仅仅有5层。)

model parallel是将一个模型的不同子网络放到不同设备上,在实现forward函数时手动移动或合并不同device上的数据。由于每个设备上仅有一部分模型,所以全部设备足以撑起一个比较大的模型。本文不会构建一个大型模型然后将其压缩至有限的GPU上,而是关注弄清model parallel的原理。具体应用取决于读者的具体应用。

对于多机model parallel训练,参考: Getting Started With Distributed RPC Framework 

Basic Usage

从包含两个线性层的简易模型开始。要在两个GPU上运行此模型,只需将每个线性层放在不同的GPU上,然后相应地移动输入和中间输出以匹配devices。

import torch
import torch.nn as nn
import torch.optim as optim


class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = torch.nn.Linear(10, 10).to('cuda:0')
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to('cuda:1')

    def forward(self, x):
        x = self.relu(self.net1(x.to('cuda:0')))
        return self.net2(x.to('cuda:1'))

注意to(devide)的用法,放在不同设备上。 

注意到ToyModel实现看起来与在单一GPU上实现非常相似,除了一些to(device)的操作,将线性层个tensor置于合适的devices。这是唯一需要的一些改动。backward()和torch.optim将自动整理梯度。你只需要确保在计算loss时labels和输出在同一GPU上。

model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to('cuda:1')
loss_fn(outputs, labels).backward()
optimizer.step()

 

Apply Model Parallel to Existing Modules

只需改动几行就可以让已有的单GPU模型run在多个GPU上。以下代码展示了如何将torchvision.models.reset50() 分解到两个GPU上。思想史将其layers分解为两部分,然后重载forwaed函数来移动中间的输出。

from torchvision.models.resnet import ResNet, Bottleneck

num_classes = 1000


class ModelParallelResNet50(ResNet):
    def __init__(self, *args, **kwargs):
        super(ModelParallelResNet50, self).__init__(
            Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)

        self.seq1 = nn.Sequential(
            self.conv1,
            self.bn1,
            self.relu,
            self.maxpool,

            self.layer1,
            self.layer2
        ).to('cuda:0')

        self.seq2 = nn.Sequential(
            self.layer3,
            self.layer4,
            self.avgpool,
        ).to('cuda:1')

        self.fc.to('cuda:1')

    def forward(self, x):
        x = self.seq2(self.seq1(x).to('cuda:1'))
        return self.fc(x.view(x.size(0), -1))

上面的实现解决了当模型太大不能在单一GPU上放下的问题。然后你可能注意到当你当模型在单一GPU可以放得下时,把它拆解后放到不同GPU上却会稍微变慢。这是因为在任何时候,仅仅只有一个GPU在工作,另一个只是在等待啥也不干。对于后者在反复的移动不同device上的中间结果时会导致额外的时间开销,以至于多个比一个还慢点。

为此做个实验验证一下,分别用随机数来训练这两个模型: ModelParallelResNet50 和torchvision.models.reset50()。

import torchvision.models as models

num_batches = 3
batch_size = 120
image_w = 128
image_h = 128


def train(model):
    model.train(True)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)

    one_hot_indices = torch.LongTensor(batch_size) 
                           .random_(0, num_classes) 
                           .view(batch_size, 1)

    for _ in range(num_batches):
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) 
                      .scatter_(1, one_hot_indices, 1)

        # run forward pass
        optimizer.zero_grad()
        outputs = model(inputs.to('cuda:0'))

        # run backward pass
        labels = labels.to(outputs.device)
        loss_fn(outputs, labels).backward()
        optimizer.step()

train(model)函数在3个批量上训练,每个批量120张图,利用timeie来run10次后统计时间。

import matplotlib.pyplot as plt
plt.switch_backend('Agg')
import numpy as np
import timeit

num_repeat = 10

stmt = "train(model)"

setup = "model = ModelParallelResNet50()"
# globals arg is only available in Python 3. In Python 2, use the following
# import __builtin__
# __builtin__.__dict__.update(locals())
mp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
mp_mean, mp_std = np.mean(mp_run_times), np.std(mp_run_times)

setup = "import torchvision.models as models;" + 
        "model = models.resnet50(num_classes=num_classes).to('cuda:0')"
rn_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
rn_mean, rn_std = np.mean(rn_run_times), np.std(rn_run_times)


def plot(means, stds, labels, fig_name):
    fig, ax = plt.subplots()
    ax.bar(np.arange(len(means)), means, yerr=stds,
           align='center', alpha=0.5, ecolor='red', capsize=10, width=0.6)
    ax.set_ylabel('ResNet50 Execution Time (Second)')
    ax.set_xticks(np.arange(len(means)))
    ax.set_xticklabels(labels)
    ax.yaxis.grid(True)
    plt.tight_layout()
    plt.savefig(fig_name)
    plt.close(fig)


plot([mp_mean, rn_mean],
     [mp_std, rn_std],
     ['Model Parallel', 'Single GPU'],
     'mp_vs_rn.png')

从上图可以看到model parallel的实现比但GPU模型开销增加了4.02/3.75-1=7% 。所以可以认为这7%的开销花在了copy不同GPU上的tensor。当然这里肯定有空间提升多GPU的表现:上面的model parallel方法在实现时将模型划分成分多个部分,这多个部分工作室串行的!!!即一个批量走完GPU-0再走GPU-1。试想这个批量在走GPU-0的时候其他GPU在休息,在走GPU-1的时候其他GPU也在休息。如何让所有GPU同时工作起来?一个解决方案就是划分批量!例如将批量大小均匀化分为两部分,第一部分数据先走GPU-0,走完之后,第一部分数据接着走GPU-1,同时第二部分开始走GPU-0...以此循环,没有GPU会空闲着。

Speed Up by Pipelining Inputs

在下面将会将120的批量大小划分为每份20张。由于PyTorch异步启动CUDA操作,因此实现不需要生成多个线程来实现并发性。

class PipelineParallelResNet50(ModelParallelResNet50):
    def __init__(self, split_size=20, *args, **kwargs):
        super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
        self.split_size = split_size

    def forward(self, x):
        splits = iter(x.split(self.split_size, dim=0))
        s_next = next(splits)
        s_prev = self.seq1(s_next).to('cuda:1')
        ret = []

        for s_next in splits:
            # A. s_prev runs on cuda:1
            s_prev = self.seq2(s_prev)
            ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

            # B. s_next runs on cuda:0, which can run concurrently with A
            s_prev = self.seq1(s_next).to('cuda:1')

        s_prev = self.seq2(s_prev)
        ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

        return torch.cat(ret)


setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)

plot([mp_mean, rn_mean, pp_mean],
     [mp_std, rn_std, pp_std],
     ['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
     'mp_vs_rn_vs_pp.png')

注意到,设备到设备tensor复制操作在源设备和目标设备上的当前流上同步。如果创建多个流,则必须确保复制操作正确同步。在完成复制操作之前写入源tensor或读/写目标张量可能会导致未定义的行为。上述实现仅在源设备和目标设备上使用默认流,因此不必强制执行额外的同步。

可以看到,这种方法不比model parallel ResNet50 快了3.75/2.51-1=49% !!!但是距离100%的理想加速还很远。为此我们可以去试着找出一种最优的split_size,上面我们的这个参数为20,但这不一定是最优参数。所以可以画图找出最优参数看看最多能加速多少。

means = []
stds = []
split_sizes = [1, 3, 5, 8, 10, 12, 20, 40, 60]

for split_size in split_sizes:
    setup = "model = PipelineParallelResNet50(split_size=%d)" % split_size
    pp_run_times = timeit.repeat(
        stmt, setup, number=1, repeat=num_repeat, globals=globals())
    means.append(np.mean(pp_run_times))
    stds.append(np.std(pp_run_times))

fig, ax = plt.subplots()
ax.plot(split_sizes, means)
ax.errorbar(split_sizes, means, yerr=stds, ecolor='red', fmt='ro')
ax.set_ylabel('ResNet50 Execution Time (Second)')
ax.set_xlabel('Pipeline Split Size')
ax.set_xticks(split_sizes)
ax.yaxis.grid(True)
plt.tight_layout()
plt.savefig("split_size_tradeoff.png")
plt.close(fig)

这个图可以看出,split_size为12时最优,将会有 3.75/2.43-1=54% 的加速!当然话有机会继续加速训练。例如,cuda:0上的所有操作都放在其默认流上。这意味着下次拆分的计算不能与上一次拆分的复制操作重叠。但是,由于prev和next拆分是不同的张量,因此不存在将一个计算与另一个计算重叠的问题。实现需要在两个gpu上使用多个流,不同的子网结构需要不同的流管理策略。由于没有一个通用的多流解决方案适用于所有模型并行用例,我们将在本教程中不讨论它。

原文地址:https://www.cnblogs.com/king-lps/p/12724360.html