# 管道并行 > 原文:[`pytorch.org/docs/stable/pipeline.html`](https://pytorch.org/docs/stable/pipeline.html)> > > 译者:[飞龙](https://github.com/wizardforcel) > > 协议:[CC BY-NC-SA 4.0](http://creativecommons.org/licenses/by-nc-sa/4.0/) 管道并行最初在[Gpipe](https://arxiv.org/abs/1811.06965)论文中提出,并且是一种有效的技术,用于在多个 GPU 上训练大型模型。 警告 管道并行是实验性的,可能会发生变化。 ## 使用多个 GPU 的模型并行 通常对于无法放入单个 GPU 的大型模型,会采用模型并行,其中模型的某些部分被放置在不同的 GPU 上。然而,如果对于顺序模型进行简单划分,训练过程会因 GPU 的低利用率而受到影响,因为如下图所示,一次只有一个 GPU 处于活动状态: ![_images/no_pipe.png](img/b9cf9a633037f50f7bc1ebee273078d5.png) 该图表示一个具有 4 层的模型,这些层分布在 4 个不同的 GPU 上(垂直轴)。水平轴表示通过时间训练该模型,演示了每次只有 1 个 GPU 被利用([图片来源](https://arxiv.org/abs/1811.06965))。 ## 管道化执行 为了缓解这个问题,管道并行将输入的小批量数据分成多个微批量数据,并将这些微批量数据的执行在多个 GPU 上进行管道化。如下图所示: ![_images/pipe.png](img/ef057fe1265f513c363e3e4cdc5a1cf7.png) 该图表示一个具有 4 层的模型,这些层分布在 4 个不同的 GPU 上(垂直轴)。水平轴表示通过时间训练该模型,演示了 GPU 的利用效率更高。然而,仍然存在一个气泡(如图所示),其中某些 GPU 未被利用。([图片来源](https://arxiv.org/abs/1811.06965)). ## PyTorch 中的 Pipe API ```py class torch.distributed.pipeline.sync.Pipe(module, chunks=1, checkpoint='except_last', deferred_batch_norm=False) ``` 将任意`nn.Sequential`模块包装起来,以便使用同步管道并行训练。如果模块需要大量内存且无法放入单个 GPU 中,则管道并行是一种有用的训练技术。 该实现基于[torchgpipe](https://arxiv.org/abs/2004.09910)论文。 Pipe 将管道并行与检查点结合起来,以减少训练所需的峰值内存,同时最大程度地减少设备的低利用率。 您应该将所有模块放在适当的设备上,并将它们包装成一个`nn.Sequential`模块,定义所需的执行顺序。如果一个模块不包含任何参数/缓冲区,则假定该模块应在 CPU 上执行,并且在执行之前,将模块的适当输入张量移动到 CPU。此行为可以通过`WithDevice`包装器覆盖,该包装器可用于明确指定模块应在哪个设备上运行。 参数 + **module**(`nn.Sequential`)- 要使用管道并行化的顺序模块。序列中的每个模块都必须将其所有参数放在单个设备上。序列中的每个模块都必须是 nn.Module 或`nn.Sequential`(用于在单个设备上组合多个顺序模块) + **chunks**([*int*](https://docs.python.org/3/library/functions.html#int "(在 Python v3.12 中)"))- 微批量的数量(默认值:`1`) + **checkpoint**([*str*](https://docs.python.org/3/library/stdtypes.html#str "(在 Python v3.12 中)"))- 何时启用检查点,可以是`'always'`、`'except_last'`或`'never'`之一(默认值:`'except_last'`)。`'never'`完全禁用检查点,`'except_last'`对除最后一个微批量之外的所有微批量启用检查点,`'always'`对所有微批量启用检查点。 + **deferred_batch_norm** ([*bool*](https://docs.python.org/3/library/functions.html#bool "(在 Python v3.12 中)")) – 是否使用延迟的`BatchNorm`移动统计信息(默认值:[`False`](https://docs.python.org/3/library/constants.html#False "(在 Python v3.12 中)")). 如果设置为[`True`](https://docs.python.org/3/library/constants.html#True "(在 Python v3.12 中)"), 我们跟踪跨多个微批次的统计信息,以更新每个小批次的运行统计信息。 引发 + [**TypeError**](https://docs.python.org/3/library/exceptions.html#TypeError "(在 Python v3.12 中)") – 模块不是`nn.Sequential`。 + [**ValueError**](https://docs.python.org/3/library/exceptions.html#ValueError "(在 Python v3.12 中)") – 无效参数 示例:: 跨 GPU 0 和 1 的两个 FC 层的管道。 ```py >>> # Need to initialize RPC framework first. >>> os.environ['MASTER_ADDR'] = 'localhost' >>> os.environ['MASTER_PORT'] = '29500' >>> torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) >>> >>> # Build pipe. >>> fc1 = nn.Linear(16, 8).cuda(0) >>> fc2 = nn.Linear(8, 4).cuda(1) >>> model = nn.Sequential(fc1, fc2) >>> model = Pipe(model, chunks=8) >>> input = torch.rand(16, 16).cuda(0) >>> output_rref = model(input) ``` 注意 您可以使用`torch.nn.parallel.DistributedDataParallel`将`Pipe`模型包装起来,只有当`Pipe`的检查点参数为`'never'`时才能这样做。 注意 `Pipe`目前仅支持节点内流水线处理,但将来将扩展为支持节点间流水线处理。前向函数返回一个`RRef`,以便将来进行节点间流水线处理,其中输出可能位于远程主机上。对于节点内流水线处理,您可以使用`local_value()`在本地检索输出。 警告 `Pipe`是实验性的,可能会发生变化。 ```py forward(*inputs) ``` 通过管道处理单个输入小批次并返回指向输出的`RRef`。`Pipe`是一个相当透明的模块包装器。它不修改底层模块的输入和输出签名。但有类型限制。输入和输出必须至少包含一个张量。此限制也适用于分区边界。 输入序列被馈送到管道的第一个阶段作为`*inputs`。因此,此函数的位置参数应与管道第一个阶段的位置参数匹配。对于管道的一个阶段的输出作为下一个阶段的输入也适用相同条件。 根据用于初始化`Pipe`的`chunks`参数,将输入张量分成多个微批次。假定批量大小是张量的第一个维度,如果批量大小小于`chunks`,则微批次的数量等于批量大小。 只有张量会被分成多个微批次,非张量输入只会在每个微批次中按原样复制。对于管道的最后阶段中的非张量输出,它们将作为`List`聚合并返回给用户。例如,如果有 2 个微批次返回整数 5,则用户将收到[5, 5]的合并输出 所有输入张量都需要与管道的第一个分区位于同一设备上。 如果张量使用`NoChunk`包装器包装,那么该张量不会在微批次之间分割,并且会按原样复制,类似于非张量。 参数 **inputs** – 输入小批次 返回 指向小批次输出的`RRef` 引发 [**TypeError**](https://docs.python.org/3/library/exceptions.html#TypeError "(在 Python v3.12 中)") – 输入不包含至少一个张量 返回类型 *RRef* ### 跳过连接 像[ResNeXt](https://pytorch.org/hub/pytorch_vision_resnext/)这样的某些模型并不完全是顺序的,它们在层之间有跳过连接。简单地作为管道并行的一部分实现会意味着我们需要通过多个 GPU 复制某些层的输出,直到最终到达包含跳过连接层的 GPU。为了避免这种复制开销,我们提供以下 API 来在模型的不同层中存储和弹出张量。 ```py torch.distributed.pipeline.sync.skip.skippable.skippable(stash=(), pop=()) ``` 定义一个装饰器来创建带有跳过连接的`nn.Module`。 这些装饰模块称为“可跳过的”。即使模块没有被`Pipe` 包装,此功能也能正常工作。 每个跳过张量都由其名称管理。在操作跳过张量之前,可跳过模块必须通过存储和/或弹出参数静态声明跳过张量的名称。具有预先声明名称的跳过张量可以通过 `yield stash(name, tensor)` 存储或通过 `tensor = yield pop(name)` 弹出。 这是一个具有三层的示例。一个名为“1to3”的跳过张量在第一层和最后一层分别存储和弹出: ```py @skippable(stash=['1to3']) class Layer1(nn.Module): def forward(self, input): yield stash('1to3', input) return f1(input) class Layer2(nn.Module): def forward(self, input): return f2(input) @skippable(pop=['1to3']) class Layer3(nn.Module): def forward(self, input): skip_1to3 = yield pop('1to3') return f3(input) + skip_1to3 model = nn.Sequential(Layer1(), Layer2(), Layer3()) ``` 一个可跳过的模块可以存储或弹出多个跳过张量: ```py @skippable(stash=['alice', 'bob'], pop=['carol']) class StashStashPop(nn.Module): def forward(self, input): yield stash('alice', f_alice(input)) yield stash('bob', f_bob(input)) carol = yield pop('carol') return input + carol ``` 每个跳过张量必须与一对存储和弹出完全关联。`Pipe` 在包装模块时会自动检查此限制。您还可以通过`verify_skippables()` 而不使用`Pipe` 来检查此限制。 返回类型 [*Callable*](https://docs.python.org/3/library/typing.html#typing.Callable "(在 Python v3.12 中)")[[[*Type*](https://docs.python.org/3/library/typing.html#typing.Type "(在 Python v3.12 中)")[*Module*]], [*Type*](https://docs.python.org/3/library/typing.html#typing.Type "(在 Python v3.12 中)")[*Skippable*]] ```py class torch.distributed.pipeline.sync.skip.skippable.stash(name, tensor) ``` 存储跳过张量的命令。 ```py def forward(self, input): yield stash('name', input) return f(input) ``` 参数 + **name** ([*str*](https://docs.python.org/3/library/stdtypes.html#str "(在 Python v3.12 中)")) – 跳过张量的名称 + **input** (*torch.Tensor* *或* *None*) – 传递给跳过连接的张量 ```py class torch.distributed.pipeline.sync.skip.skippable.pop(name) ``` 弹出跳过张量的命令。 ```py def forward(self, input): skip = yield pop('name') return f(input) + skip ``` 参数 **name** ([*str*](https://docs.python.org/3/library/stdtypes.html#str "(在 Python v3.12 中)")) – 跳过张量的名称 返回 之前由同一名称下的另一层存储的跳过张量 返回类型 无 ```py torch.distributed.pipeline.sync.skip.skippable.verify_skippables(module) ``` 验证底层可跳过模块是否满足完整性。 每个跳过张量必须只有一对存储和弹出。如果有一个或多个不匹配的对,它将引发[`TypeError`](https://docs.python.org/3/library/exceptions.html#TypeError "(在 Python v3.12 中)") 并附带详细消息。 以下是一些失败案例。`verify_skippables()` 将报告这些案例的失败: ```py # Layer1 stashes "1to3". # Layer3 pops "1to3". nn.Sequential(Layer1(), Layer2()) # └──── ? nn.Sequential(Layer2(), Layer3()) # ? ────┘ nn.Sequential(Layer1(), Layer2(), Layer3(), Layer3()) # └───────────────────┘ ^^^^^^ nn.Sequential(Layer1(), Layer1(), Layer2(), Layer3()) # ^^^^^^ └───────────────────┘ ``` 要为多个跳过张量使用相同的名称,它们必须通过不同的命名空间隔离。参见 `isolate()`。 引发 [**TypeError**](https://docs.python.org/3/library/exceptions.html#TypeError "(在 Python v3.12 中)") – 一个或多个存储和弹出不匹配的对。 ## 教程 以下教程提供了如何使用`Pipe` API 来训练模型以及 PyTorch 提供的其他组件的概述: + [使用管道并行训练 Transformer 模型](https://pytorch.org/tutorials/intermediate/pipeline_tutorial.html) + [使用分布式数据并行和管道并行训练 Transformer 模型](https://pytorch.org/tutorials/advanced/ddp_pipeline.html) ## 致谢 管道并行实现基于[fairscale 的管道实现](https://github.com/facebookresearch/fairscale/tree/main/fairscale/nn/pipe)和[torchgpipe](https://github.com/kakaobrain/torchgpipe)。我们要感谢两个团队对于他们在将管道并行引入 PyTorch 方面的贡献和指导。