# 分布式通讯包-Torch.Distributed > 原文: [https://pytorch.org/docs/stable/distributed.html](https://pytorch.org/docs/stable/distributed.html) ## 后端 `torch.distributed`支持三个后端,每个后端具有不同的功能。 下表显示了可用于 CPU / CUDA 张量的功能。 MPI 仅在用于构建 PyTorch 的实现支持 CUDA 的情况下才支持 CUDA。 | 后端 | `gloo` | `mpi` | `nccl` | | --- | --- | --- | --- | | 设备 | 中央处理器 | 显卡 | CPU | GPU | CPU | GPU | | --- | --- | --- | --- | --- | --- | --- | | 发送 | ✓ | ✘ | ✓ | ? | ✘ | ✘ | | 收录 | ✓ | ✘ | ✓ | ? | ✘ | ✘ | | 广播 | ✓ | ✓ | ✓ | ? | ✘ | ✓ | | all_reduce | ✓ | ✓ | ✓ | ? | ✘ | ✓ | | 降低 | ✓ | ✘ | ✓ | ? | ✘ | ✓ | | all_gather | ✓ | ✘ | ✓ | ? | ✘ | ✓ | | 收集 | ✓ | ✘ | ✓ | ? | ✘ | ✘ | | 分散 | ✓ | ✘ | ✓ | ? | ✘ | ✘ | | 屏障 | ✓ | ✘ | ✓ | ? | ✘ | ✓ | ### PyTorch 随附的后端 当前分发的 PyTorch 仅支持 Linux。 默认情况下,会构建 Gloo 和 NCCL 后端并将其包含在 PyTorch 分布式中(仅在使用 CUDA 进行构建时才为 NCCL)。 MPI 是可选的后端,仅当您从源代码构建 PyTorch 时,才可以包含它。 (例如,在安装了 MPI 的主机上构建 PyTorch。) ### 使用哪个后端? 在过去,我们经常被问到:“我应该使用哪个后端?”。 * 经验法则 * 使用 NCCL 后端进行分布式 **GPU** 训练 * 使用 Gloo 后端进行分布式 **CPU** 训练。 * 具有 InfiniBand 互连的 GPU 主机 * 使用 NCCL,因为它是当前唯一支持 InfiniBand 和 GPUDirect 的后端。 * 具有以太网互连的 GPU 主机 * 使用 NCCL,因为它目前提供最佳的分布式 GPU 训练性能,尤其是对于多进程单节点或多节点分布式训练。 如果您在使用 NCCL 时遇到任何问题,请使用 Gloo 作为后备选项。 (请注意,对于 GPU,Gloo 当前的运行速度比 NCCL 慢。) * 具有 InfiniBand 互连的 CPU 主机 * 如果您的 InfiniBand 已启用 IB IP,请使用 Gloo,否则,请使用 MPI。 我们计划在即将发布的版本中增加 InfiniBand 对 Gloo 的支持。 * 具有以太网互连的 CPU 主机 * 除非有特殊原因要使用 MPI,否则请使用 Gloo。 ### 常见环境变量 #### 选择要使用的网络接口 默认情况下,NCCL 和 Gloo 后端都将尝试找到要使用的正确网络接口。 如果自动检测到的接口不正确,则可以使用以下环境变量(适用于各自的后端)覆盖它: * **NCCL_SOCKET_IFNAME** ,例如`export NCCL_SOCKET_IFNAME=eth0` * **GLOO_SOCKET_IFNAME** ,例如`export GLOO_SOCKET_IFNAME=eth0` 如果您使用的是 Gloo 后端,则可以用逗号分隔多个接口,例如:`export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3`。 后端将以循环方式在这些接口之间调度操作。 至关重要的是,所有进程都必须在此变量中指定相同数量的接口。 #### 其他 NCCL 环境变量 NCCL 还提供了许多环境变量以进行微调。 常用的调试工具包括以下内容: * `export NCCL_DEBUG=INFO` * `export NCCL_DEBUG_SUBSYS=ALL` 有关 NCCL 环境变量的完整列表,请参阅 [NVIDIA NCCL 的官方文档](https://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/docs/env.html) ## 基本 <cite>torch分布式</cite>程序包提供 PyTorch 支持和通信原语,以实现在一台或多台机器上运行的多个计算节点之间的多进程并行性。 类 [`torch.nn.parallel.DistributedDataParallel()`](nn.html#torch.nn.parallel.DistributedDataParallel "torch.nn.parallel.DistributedDataParallel") 建立在此功能之上,以提供同步的分布式训练,作为围绕任何 PyTorch 模型的包装。 这与 [Multiprocessing 程序包提供的并行性不同– Torch.multiprocessing](multiprocessing.html) 和 [`torch.nn.DataParallel()`](nn.html#torch.nn.DataParallel "torch.nn.DataParallel") 支持多个联网的机器,并且用户必须明确启动一个单独的 每个过程的主要训练脚本的副本。 在单机同步情况下,<cite>torch分布式</cite>或 [`torch.nn.parallel.DistributedDataParallel()`](nn.html#torch.nn.parallel.DistributedDataParallel "torch.nn.parallel.DistributedDataParallel") 包装器可能仍比其他数据并行方法(包括 [`torch.nn.DataParallel()`](nn.html#torch.nn.DataParallel "torch.nn.DataParallel"))具有优势 ]: * 每个过程都维护自己的优化器,并在每次迭代时执行完整的优化步骤。 尽管这看起来可能是多余的,但由于梯度已经被收集在一起并在各个过程之间求平均,因此对于每个过程都是相同的,这意味着不需要参数广播步骤,从而减少了在节点之间传递张量的时间。 * 每个进程都包含一个独立的 Python 解释器,从而消除了由单个 Python 进程驱动多个执行线程,模型副本或 GPU 所带来的额外解释器开销和“ GIL 颠簸”。 这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或许多小组件的模型。 ## 初始化 在调用任何其他方法之前,需要使用 [`torch.distributed.init_process_group()`](#torch.distributed.init_process_group "torch.distributed.init_process_group") 函数初始化该程序包。 这将阻塞,直到所有进程都已加入。 * * * ``` torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='') ``` 初始化默认的分布式进程组,这还将初始化分布式程序包。 ``` There are 2 main ways to initialize a process group: ``` 1. 明确指定`store`,`rank`和`world_size`。 2. 指定`init_method`(URL 字符串),它指示在何处/如何发现对等方。 (可选)指定`rank`和`world_size`,或在 URL 中编码所有必需的参数并忽略它们。 如果两者均未指定,则将`init_method`假定为“ env://”。 参数 * **后端** (_str_ _或_ [_后端_](#torch.distributed.Backend "torch.distributed.Backend"))–要使用的后端。 根据构建时配置,有效值包括`mpi`,`gloo`和`nccl`。 该字段应以小写字符串(例如`"gloo"`)形式给出,也可以通过 [`Backend`](#torch.distributed.Backend "torch.distributed.Backend") 属性(例如`Backend.GLOO`)进行访问。 如果每台具有`nccl`后端的计算机使用多个进程,则每个进程必须对其使用的每个 GPU 都具有独占访问权限,因为在进程之间共享 GPU 可能会导致死锁。 * **init_method** (_str_ _,_ _可选_)–指定如何初始化进程组的 URL。 如果未指定`init_method`或`store`,则默认值为“ env://”。 与`store`互斥。 * **world_size** (_python:int_ _,_ _可选_)–参与作业的进程数。 如果指定`store`,则为必需。 * **等级** (_python:int_ _,_ _可选_)–当前进程的等级。 如果指定`store`,则为必需。 * **存储区**(_存储区_ _,_ _可选_)–所有工作人员均可访问的键/值存储,用于交换连接/地址信息。 与`init_method`互斥。 * **超时** (_timedelta_ _,_ _可选_)–针对进程组执行的操作的超时。 默认值等于 30 分钟。 这适用于`gloo`后端。 对于`nccl`,仅在环境变量`NCCL_BLOCKING_WAIT`设置为 1 时适用。 * **group_name** (_str_ _,_ _可选_ _,_ _不推荐使用_)–组名。 要启用`backend == Backend.MPI`,PyTorch 需要从支持 MPI 的系统上的源代码构建。 NCCL 同样适用。 * * * ``` class torch.distributed.Backend ``` 类似于枚举的可用后端类:GLOO,NCCL 和 MPI。 此类的值是小写字符串,例如`"gloo"`。 可以将它们作为属性来访问,例如`Backend.NCCL`。 可以直接调用此类来解析字符串,例如`Backend(backend_str)`将检查`backend_str`是否有效,如果有效,则返回已解析的小写字符串。 它还接受大写字符串,例如`Backend("GLOO")`返回`"gloo"`。 注意 条目`Backend.UNDEFINED`存在,但仅用作某些字段的初始值。 用户既不应直接使用它,也不应该假定它的存在。 * * * ``` torch.distributed.get_backend(group=) ``` 返回给定进程组的后端。 Parameters **组** (_ProcessGroup_ _,_ _可选_)–要处理的过程组。 默认值为常规主流程组。 如果指定了另一个特定组,则调用过程必须是`group`的一部分。 退货 给定进程组的后端,为小写字符串。 * * * ``` torch.distributed.get_rank(group=) ``` 返回当前进程组的等级 等级是分配给分布式过程组中每个过程的唯一标识符。 它们始终是从 0 到`world_size`的连续整数。 Parameters **组** (_ProcessGroup_ _,_ _可选_)–要处理的进程组 Returns 进程组-1(如果不属于组)的等级 * * * ``` torch.distributed.get_world_size(group=) ``` 返回当前进程组中的进程数 Parameters **group** (_ProcessGroup__,_ _optional_) – The process group to work on Returns 进程组-1 的世界大小,如果不是该组的一部分 * * * ``` torch.distributed.is_initialized() ``` 检查默认进程组是否已初始化 * * * ``` torch.distributed.is_mpi_available() ``` 检查 MPI 后端是否可用。 * * * ``` torch.distributed.is_nccl_available() ``` 检查 NCCL 后端是否可用。 * * * 当前支持三种初始化方法: ### TCP 初始化 有两种使用 TCP 进行初始化的方式,两种方式都需要所有进程都可以访问的网络地址以及所需的`world_size`。 第一种方法要求指定一个地址,该地址属于等级 0 进程。 此初始化方法要求所有进程都具有手动指定的等级。 请注意,最新的分布式程序包中不再支持多播地址。 `group_name`也已弃用。 ``` import torch.distributed as dist # Use address of one of the machines dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4) ``` ### 共享文件系统初始化 另一种初始化方法利用了文件系统以及所需的`world_size`,该文件系统可从组中的所有计算机共享并可见。 该 URL 应该以`file://`开头,并包含一个指向共享文件系统上不存在的文件(在现有目录中)的路径。 如果文件系统初始化不存在,则会自动创建该文件,但不会删除该文件。 因此,您有责任确保在相同文件路径/名称的下一个 [`init_process_group()`](#torch.distributed.init_process_group "torch.distributed.init_process_group") 调用之前清除文件。 请注意,最新的分布式软件包不再支持自动等级分配,并且`group_name`也已弃用。 警告 此方法假定文件系统支持使用`fcntl`进行锁定-大多数本地系统和 NFS 都支持它。 Warning 此方法将始终创建文件,并尽力在程序末尾清理并删除文件。 换句话说,使用文件 init 方法进行的每次初始化都需要一个全新的空文件,以使初始化成功。 如果再次使用先前初始化使用的同一文件(碰巧不会被清除),则这是意外行为,通常会导致死锁和失败。 因此,即使此方法将尽最大努力清除文件,但如果自动删除碰巧失败,您有责任确保在训练结束时将文件删除,以防止同一文件被删除。 在下一次再次使用。 如果您计划在同一文件名上多次调用 [`init_process_group()`](#torch.distributed.init_process_group "torch.distributed.init_process_group") ,这尤其重要。 换句话说,如果未删除/清除文件,然后对该文件再次调用 [`init_process_group()`](#torch.distributed.init_process_group "torch.distributed.init_process_group") ,则可能会失败。 经验法则是,每次调用 [`init_process_group()`](#torch.distributed.init_process_group "torch.distributed.init_process_group") 时,请确保文件不存在或为空。 ``` import torch.distributed as dist # rank should always be specified dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank) ``` ### 环境变量初始化 该方法将从环境变量中读取配置,从而可以完全自定义如何获取信息。 要设置的变量是: * `MASTER_PORT`-必填; 必须是等级为 0 的计算机上的空闲端口 * `MASTER_ADDR`-必填(0 级除外); 等级 0 节点的地址 * `WORLD_SIZE`-必填; 可以在此处或在调用 init 函数时进行设置 * `RANK`-必填; 可以在此处或在调用 init 函数时进行设置 等级为 0 的计算机将用于建立所有连接。 这是默认方法,这意味着不必指定`init_method`(也可以是`env://`)。 ## 团体 默认情况下,集合体在默认组(也称为世界)上运行,并要求所有进程进入分布式函数调用。 但是,某些工作负载可以从更细粒度的通信中受益。 这是分布式组起作用的地方。 [`new_group()`](#torch.distributed.new_group "torch.distributed.new_group") 功能可用于创建带有所有进程的任意子集的新组。 它返回一个不透明的组句柄,该句柄可以作为`group`参数提供给所有集合(集合是分布式函数,用于以某些众所周知的编程模式交换信息)。 * * * ``` torch.distributed.new_group(ranks=None, timeout=datetime.timedelta(0, 1800), backend=None) ``` 创建一个新的分布式组。 此功能要求主组中的所有进程(即,属于分布式作业的所有进程)都必须输入此功能,即使它们不会成为该组的成员也是如此。 此外,应在所有过程中以相同顺序创建组。 Parameters * **排名**(_列表_ _[_ _python:int_ _]_ )–组成员的等级列表。 * **超时** (_timedelta_ _,_ _可选_)–针对进程组执行的操作的超时。 默认值等于 30 分钟。 这仅适用于`gloo`后端。 * **后端** (_str_ _或_ [_后端_](#torch.distributed.Backend "torch.distributed.Backend") _,_ _可选_) –要使用的后端。 根据构建时配置,有效值为`gloo`和`nccl`。 默认情况下,使用与全局组相同的后端。 此字段应以小写字符串(例如`"gloo"`)形式给出,也可以通过 [`Backend`](#torch.distributed.Backend "torch.distributed.Backend") 属性(例如`Backend.GLOO`)进行访问。 Returns 可以分配给集体呼叫的分布式组的句柄。 ## 点对点通讯 * * * ``` torch.distributed.send(tensor, dst, group=, tag=0) ``` 同步发送张量。 Parameters * **张量** ([_tensor_](tensors.html#torch.Tensor "torch.Tensor"))–要发送的张量。 * **dst** (_python:int_ )–目标排名。 * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **标签** (_python:int_ _,_ _可选_)–与远程 recv 发送匹配的标签 * * * ``` torch.distributed.recv(tensor, src=None, group=, tag=0) ``` 同步接收张量。 Parameters * **张量** ([_tensor_](tensors.html#torch.Tensor "torch.Tensor"))–张量以填充接收到的数据。 * **src** (_python:int_ _,_ _可选_)–源排名。 如果未指定,将从任何进程中接收。 * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **标记** (_python:int_ _,_ _可选_)–用于将 recv 与远程发送匹配的标记 Returns 发件人等级-1,如果不属于该组 [`isend()`](#torch.distributed.isend "torch.distributed.isend") 和 [`irecv()`](#torch.distributed.irecv "torch.distributed.irecv") 在使用时返回分布式请求对象。 通常,此对象的类型是不确定的,因为它们永远不应该手动创建,但是可以保证它们支持两种方法: * `is_completed()`-如果操作完成,则返回 True * `wait()`-将阻止该过程,直到操作完成。 保证`is_completed()`一旦返回就返回 True。 * * * ``` torch.distributed.isend(tensor, dst, group=, tag=0) ``` 异步发送张量。 Parameters * **tensor** ([_Tensor_](tensors.html#torch.Tensor "torch.Tensor")) – Tensor to send. * **dst** (_python:int_) – Destination rank. * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **tag** (_python:int__,_ _optional_) – Tag to match send with remote recv Returns 分布式请求对象。 无,如果不是该组的一部分 * * * ``` torch.distributed.irecv(tensor, src, group=, tag=0) ``` 异步接收张量。 Parameters * **tensor** ([_Tensor_](tensors.html#torch.Tensor "torch.Tensor")) – Tensor to fill with received data. * **src** (_python:int_ )–源排名。 * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **tag** (_python:int__,_ _optional_) – Tag to match recv with remote send Returns A distributed request object. None, if not part of the group ## 同步和异步集体操作 每个集体操作功能都支持以下两种操作: 同步操作-`async_op`设置为 False 时的默认模式。 当函数返回时,可以确保执行了集合操作(​​如果它是 CUDA op,则不一定要完成,因为所有 CUDA ops 都是异步的),并且可以根据集合操作的数据调用任何进一步的函数。 在同步模式下,集合函数不返回任何内容 异步操作-当`async_op`设置为 True 时。 集合操作函数返回一个分布式请求对象。 通常,您不需要手动创建它,并且可以支持两种方法: * `is_completed()` - returns True if the operation has finished * `wait()`-将阻止该过程,直到操作完成。 ## 集体职能 * * * ``` torch.distributed.broadcast(tensor, src, group=, async_op=False) ``` 向整个组广播张量。 `tensor`在参与集合的所有进程中必须具有相同数量的元素。 Parameters * **张量** ([_tensor_](tensors.html#torch.Tensor "torch.Tensor"))–如果`src`是当前进程的等级,则发送数据,否则使用张量保存接收到的数据。 * **src** (_python:int_) – Source rank. * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool_ _,_ _可选_)–此 op 是否应为异步 op Returns 异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不是该组的一部分 * * * ``` torch.distributed.all_reduce(tensor, op=ReduceOp.SUM, group=, async_op=False) ``` 减少所有机器上的张量数据,以使所有机器都能得到最终结果。 调用之后,`tensor`将在所有进程中按位相同。 Parameters * **张量** ([_tensor_](tensors.html#torch.Tensor "torch.Tensor"))–集合的输入和输出。 该功能就地运行。 * **op** (_可选_)–来自`torch.distributed.ReduceOp`枚举的值之一。 指定用于逐元素精简的操作。 * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group * * * ``` torch.distributed.reduce(tensor, dst, op=ReduceOp.SUM, group=, async_op=False) ``` 减少所有机器上的张量数据。 只有等级为`dst`的进程才能收到最终结果。 Parameters * **tensor** ([_Tensor_](tensors.html#torch.Tensor "torch.Tensor")) – Input and output of the collective. The function operates in-place. * **dst** (_python:int_ )–目标排名 * **op** (_optional_) – One of the values from `torch.distributed.ReduceOp` enum. Specifies an operation used for element-wise reductions. * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group * * * ``` torch.distributed.all_gather(tensor_list, tensor, group=, async_op=False) ``` 在列表中收集整个组的张量。 Parameters * **tensor_list** (_列表_ _[_ [_tensor_](tensors.html#torch.Tensor "torch.Tensor") _]_ )–输出列表。 它应包含正确大小的张量以用于集合的输出。 * **张量** ([_tensor_](tensors.html#torch.Tensor "torch.Tensor"))–要从当前进程广播的张量。 * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group * * * ``` torch.distributed.gather(tensor, gather_list=None, dst=0, group=, async_op=False) ``` 在单个过程中收集张量列表。 Parameters * **张量** ([_tensor_](tensors.html#torch.Tensor "torch.Tensor"))–输入张量。 * **collect_list** (_列表_ _[_ [_tensor_](tensors.html#torch.Tensor "torch.Tensor") _]_ _,_ _可选_)–用于收集数据的适当大小的张量列表(默认为 None,必须在目标等级上指定) * **dst** (_python:int_ _,_ _可选_)–目标排名(默认为 0) * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group * * * ``` torch.distributed.scatter(tensor, scatter_list=None, src=0, group=, async_op=False) ``` 将张量列表分散到组中的所有进程。 每个进程将仅接收一个张量并将其数据存储在`tensor`参数中。 Parameters * **张量** ([_tensor_](tensors.html#torch.Tensor "torch.Tensor"))–输出张量。 * **scatter_list** (_列表_ _[_ [_tensor_](tensors.html#torch.Tensor "torch.Tensor") _]_ )–分散的张量列表 (默认为无,必须在源排名上指定) * **src** (_python:int_ )–源排名(默认为 0) * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group * * * ``` torch.distributed.barrier(group=, async_op=False) ``` 同步所有进程。 如果 async_op 为 False,或者在 wait()上调用了异步工作句柄,则该集合将阻塞进程,直到整个组都进入该函数。 Parameters * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group * * * ``` class torch.distributed.ReduceOp ``` 可用还原操作的类枚举类:`SUM`,`PRODUCT`,`MIN`,`MAX`,`BAND`,`BOR`和`BXOR`。 此类的值可以作为属性访问,例如`ReduceOp.SUM`。 它们用于指定减少集合体的策略,例如 [`reduce()`](#torch.distributed.reduce "torch.distributed.reduce") , [`all_reduce_multigpu()`](#torch.distributed.all_reduce_multigpu "torch.distributed.all_reduce_multigpu") 等。 成员: > 和 > > 产品 > > 最小 > > 最大值 > > 带 > > BOR > > 异或 * * * ``` class torch.distributed.reduce_op ``` 减少操作的不推荐枚举类:`SUM`,`PRODUCT`,`MIN`和`MAX`。 建议改用 [`ReduceOp`](#torch.distributed.ReduceOp "torch.distributed.ReduceOp") 。 ## 多 GPU 集合功能 如果每个节点上有多个 GPU,则在使用 NCCL 和 Gloo 后端时, [`broadcast_multigpu()`](#torch.distributed.broadcast_multigpu "torch.distributed.broadcast_multigpu") [`all_reduce_multigpu()`](#torch.distributed.all_reduce_multigpu "torch.distributed.all_reduce_multigpu") [`reduce_multigpu()`](#torch.distributed.reduce_multigpu "torch.distributed.reduce_multigpu") 和 [`all_gather_multigpu()`](#torch.distributed.all_gather_multigpu "torch.distributed.all_gather_multigpu") 支持在每个节点内的多个 GPU 之间进行分布式集体操作。 这些功能可以潜在地改善整体分布式训练性能,并且可以通过传递张量列表轻松使用。 传递的张量列表中的每个张量必须位于调用该函数的主机的单独 GPU 设备上。 请注意,在所有分布式过程中,张量列表的长度必须相同。 另请注意,当前只有 NCCL 后端支持多 GPU 集合功能。 例如,如果我们用于分布式训练的系统有 2 个节点,每个节点都有 8 个 GPU。 在 16 个 GPU 的每个 GPU 上,都有一个我们想全部减少的张量。 以下代码可以作为参考: 在节点 0 上运行的代码 ``` import torch import torch.distributed as dist dist.init_process_group(backend="nccl", init_method="file:///distributed_test", world_size=2, rank=0) tensor_list = [] for dev_idx in range(torch.cuda.device_count()): tensor_list.append(torch.FloatTensor([1]).cuda(dev_idx)) dist.all_reduce_multigpu(tensor_list) ``` 在节点 1 上运行的代码 ``` import torch import torch.distributed as dist dist.init_process_group(backend="nccl", init_method="file:///distributed_test", world_size=2, rank=1) tensor_list = [] for dev_idx in range(torch.cuda.device_count()): tensor_list.append(torch.FloatTensor([1]).cuda(dev_idx)) dist.all_reduce_multigpu(tensor_list) ``` 调用之后,两个节点上的所有 16 张量将具有全部约简值 16 * * * ``` torch.distributed.broadcast_multigpu(tensor_list, src, group=, async_op=False, src_tensor=0) ``` 将张量广播到整个组,每个节点具有多个 GPU 张量。 `tensor`在参与集合的所有进程的所有 GPU 中必须具有相同数量的元素。 列表中的每个张量必须在不同的 GPU 上 当前仅支持 nccl 和 gloo 后端张量应仅是 GPU 张量 Parameters * **tensor_list** (_列表_ _[_ [_tensor_](tensors.html#torch.Tensor "torch.Tensor") _]_ )–参与集体的张量 操作。 如果`src`是等级,则`tensor_list`(`tensor_list[src_tensor]`)的指定`src_tensor`元素将在 src 进程中广播给所有其他张量(在不同 GPU 上),而在其他非张量中`tensor_list`的所有张量 -src 进程。 您还需要确保所有调用此函数的分布式进程的`len(tensor_list)`相同。 * **src** (_python:int_) – Source rank. * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op * **src_tensor** (_python:int_ _,_ _可选_)–在`tensor_list`内的源张量等级 Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group * * * ``` torch.distributed.all_reduce_multigpu(tensor_list, op=ReduceOp.SUM, group=, async_op=False) ``` 减少所有机器上的张量数据,以使所有机器都能得到最终结果。 此功能可减少每个节点上的张量数量,而每个张量位于不同的 GPU 上。 因此,张量列表中的输入张量必须是 GPU 张量。 同样,张量列表中的每个张量都需要驻留在不同的 GPU 上。 调用之后,`tensor_list`中的所有`tensor`在所有进程中都将按位相同。 当前仅支持 nccl 和 gloo 后端张量应仅是 GPU 张量 Parameters * **列表**(_tensor_)–集合的输入和输出张量的列表。 该函数在原位运行,并且要求每个张量都是不同 GPU 上的 GPU 张量。 您还需要确保所有调用此函数的分布式进程的`len(tensor_list)`相同。 * **op** (_optional_) – One of the values from `torch.distributed.ReduceOp` enum. Specifies an operation used for element-wise reductions. * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group * * * ``` torch.distributed.reduce_multigpu(tensor_list, dst, op=ReduceOp.SUM, group=, async_op=False, dst_tensor=0) ``` 减少所有计算机上多个 GPU 上的张量数据。 `tensor_list`中的每个张量应驻留在单独的 GPU 上 排名为`dst`的进程中只有`tensor_list[dst_tensor]`的 GPU 会收到最终结果。 当前仅支持 nccl 后端张量应仅是 GPU 张量 Parameters * **tensor_list** (_列表_ _[_ [_tensor_](tensors.html#torch.Tensor "torch.Tensor") _]_ )–输入和输出的 GPU 张量 集体。 该功能就地运行。 您还需要确保所有调用此函数的分布式进程的`len(tensor_list)`相同。 * **dst** (_python:int_) – Destination rank * **op** (_optional_) – One of the values from `torch.distributed.ReduceOp` enum. Specifies an operation used for element-wise reductions. * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op * **dst_tensor** (_python:int_ _,_ _可选_)– `tensor_list`中的目标张量等级 Returns 异步工作句柄(如果 async_op 设置为 True)。 无,否则 * * * ``` torch.distributed.all_gather_multigpu(output_tensor_lists, input_tensor_list, group=, async_op=False) ``` 在列表中收集整个组的张量。 `tensor_list`中的每个张量应驻留在单独的 GPU 上 Only nccl backend is currently supported tensors should only be GPU tensors Parameters * **output_tensor_lists** (_列表_ _[_ _列表_ _[_ [_Tensor_](tensors.html#torch.Tensor "torch.Tensor") [ _]_ _]_ )– 输出列表。 它应该在每个 GPU 上包含正确大小的张量,以用于集合的输出,例如 `output_tensor_lists[i]`包含位于`input_tensor_list[i]`的 GPU 上的 all_gather 结果。 请注意,`output_tensor_lists`的每个元素的大小均为`world_size * len(input_tensor_list)`,因为该函数都从组中的每个 GPU 收集结果。 要解释`output_tensor_lists[i]`的每个元素,请注意,排名为 k 的`input_tensor_list[j]`将出现在`output_tensor_lists[i][k * world_size + j]`中 还要注意,对于所有调用此函数的分布式进程,`len(output_tensor_lists)`和`output_tensor_lists`中每个元素的大小(每个元素是一个列表,因此`len(output_tensor_lists[i])`)必须相同。 * **input_tensor_list** (_列表_ _[_ [_tensor_](tensors.html#torch.Tensor "torch.Tensor") _]_ )–张量列表(不同) GPU)从当前进程中广播。 注意,对于所有调用此函数的分布式进程,`len(input_tensor_list)`必须相同。 * **group** (_ProcessGroup__,_ _optional_) – The process group to work on * **async_op** (_bool__,_ _optional_) – Whether this op should be an async op Returns Async work handle, if async_op is set to True. None, if not async_op or if not part of the group ## 启动实用程序 <cite>torch.distributed</cite> 程序包还在 <cite>torch.distributed.launch</cite> 中提供了启动实用程序。 此帮助程序实用程序可用于为每个节点启动多个进程以进行分布式训练。 该实用程序还支持 python2 和 python3。 ## Spawn 实用程序 [Multiprocessing 软件包-Torch.multiprocessing](multiprocessing.html#multiprocessing-doc) 软件包还在 [`torch.multiprocessing.spawn()`](multiprocessing.html#torch.multiprocessing.spawn "torch.multiprocessing.spawn") 中提供了`spawn`功能。 此辅助函数可用于产生多个进程。 它通过传入要运行的函数并产生 N 个进程来运行它而起作用。 这也可以用于多进程分布式训练。 有关如何使用它的参考,请参考 [PyTorch 示例-ImageNet 实现](https://github.com/pytorch/examples/tree/master/imagenet) 请注意,此功能需要 Python 3.4 或更高版本。