# 分布式 RPC 框架 > 链接:[`pytorch.org/docs/stable/rpc.html`](https://pytorch.org/docs/stable/rpc.html) 分布式 RPC 框架提供了多机模型训练的机制,通过一组原语允许远程通信,并提供一个更高级的 API 来自动区分跨多台机器分割的模型。 警告 RPC 包中的 API 是稳定的。有多个正在进行的工作项目来改进性能和错误处理,这些将在未来的版本中发布。 警告 CUDA 支持在 PyTorch 1.9 中引入,仍然是一个**beta**功能。RPC 包的并非所有功能都与 CUDA 支持兼容,因此不建议使用。这些不受支持的功能包括:RRefs、JIT 兼容性、分布式自动求导和分布式优化器,以及性能分析。这些缺陷将在未来的版本中得到解决。 注意 有关与分布式训练相关的所有功能的简要介绍,请参阅[PyTorch 分布式概述](https://pytorch.org/tutorials/beginner/dist_overview.html)。 ## 基础知识 分布式 RPC 框架使远程运行函数变得容易,支持引用远程对象而无需复制真实数据,并提供自动求导和优化器 API 以透明地在 RPC 边界上运行反向传播和更新参数。这些功能可以分为四组 API。 1. **远程过程调用(RPC)**支持在指定的目标工作进程上运行函数,并获取返回值或创建对返回值的引用。有三个主要的 RPC API:`rpc_sync()`(同步)、`rpc_async()`(异步)和`remote()`(异步并返回对远程返回值的引用)。如果用户代码不能在没有返回值的情况下继续进行,则使用同步 API。否则,使用异步 API 获取一个 future,并在调用者需要返回值时等待 future。当需求是远程创建某物但从不需要将其提取到调用者时,`remote()` API 是有用的。想象一下,驱动进程正在设置参数服务器和训练器。驱动程序可以在参数服务器上创建一个嵌入表,然后与训练器共享对嵌入表的引用,但本身永远不会在本地使用嵌入表。在这种情况下,`rpc_sync()`和`rpc_async()`不再适用,因为它们总是意味着返回值将立即或在将来返回给调用者。 1. **远程引用(RRef)**用作本地或远程对象的分布式共享指针。它可以与其他工作进程共享,并且引用计数将被透明处理。每个 RRef 只有一个所有者,对象只存在于该所有者上。持有 RRefs 的非所有者工作进程可以通过显式请求从所有者那里获取对象的副本。当工作进程需要访问某个数据对象,但本身既不是创建者(`remote()`的调用者)也不是对象的所有者时,这是很有用的。正如我们将在下面讨论的分布式优化器,是这种用例的一个例子。 1. **分布式自动微分**将所有参与前向传递的工作节点上的本地自动微分引擎连接在一起,并在后向传递期间自动到达它们以计算梯度。如果前向传递需要跨多台机器进行,例如进行分布式模型并行训练、参数服务器训练等,这将特别有帮助。有了这个功能,用户代码不再需要担心如何在 RPC 边界之间发送梯度以及本地自动微分引擎应该以哪种顺序启动,这在前向传递中存在嵌套和相互依赖的 RPC 调用时可能会变得非常复杂。 1. **分布式优化器**的构造函数接受一个`Optimizer()`(例如,`SGD()`、`Adagrad()`等)和参数 RRef 列表,为每个不同的 RRef 所有者创建一个`Optimizer()`实例,并在运行`step()`时相应地更新参数。当进行分布式前向和后向传递时,参数和梯度将分散在多个工作节点上,因此每个涉及的工作节点都需要一个优化器。分布式优化器将所有这些本地优化器包装在一起,并提供简洁的构造函数和`step()` API。 ## RPC 在使用 RPC 和分布式自动微分原语之前,必须进行初始化。要初始化 RPC 框架,我们需要使用`init_rpc()`,这将初始化 RPC 框架、RRef 框架和分布式自动微分。 ```py torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None) ``` 初始化 RPC 原语,如本地 RPC 代理和分布式自动微分,这将立即使当前进程准备好发送和接收 RPC。 参数 + **name**([*str*](https://docs.python.org/3/library/stdtypes.html#str "(在 Python v3.12 中)"))- 此节点的全局唯一名称。 (例如,`Trainer3`、`ParameterServer2`、`Master`、`Worker1`)名称只能包含数字、字母、下划线、冒号和/或破折号,并且必须少于 128 个字符。 + **backend**(*BackendType**,* *可选*) - RPC 后端实现的类型。支持的值是`BackendType.TENSORPIPE`(默认值)。有关更多信息,请参见后端。 + **rank**([*int*](https://docs.python.org/3/library/functions.html#int "(在 Python v3.12 中)"))- 此节点的全局唯一 id/排名。 + **world_size**([*int*](https://docs.python.org/3/library/functions.html#int "(在 Python v3.12 中)"))- 组中的工作节点数。 + **rpc_backend_options**(*RpcBackendOptions**,* *可选*) - 传递给 RpcAgent 构造函数的选项。它必须是`RpcBackendOptions`的特定于代理的子类,并包含特定于代理的初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用`init_method = "env://"`初始化底层进程组进行会合,这意味着环境变量`MASTER_ADDR`和`MASTER_PORT`需要正确设置。有关更多信息,请参见后端,查找可用的选项。 以下 API 允许用户远程执行函数以及创建对远程数据对象的引用(RRefs)。在这些 API 中,当将`Tensor`作为参数或返回值传递时,目标工作节点将尝试创建具有相同元数据(即形状、步幅等)的`Tensor`。我们有意禁止传输 CUDA 张量,因为如果源工作节点和目标工作节点上的设备列表不匹配,可能会导致崩溃。在这种情况下,应用程序始终可以在调用者上明确将输入张量移动到 CPU,并在必要时将其移动到被调用者上的所需设备。 警告 RPC 中的 TorchScript 支持是一个原型功能,可能会发生变化。自 v1.5.0 以来,`torch.distributed.rpc`支持将 TorchScript 函数作为 RPC 目标函数调用,这将有助于提高被调用者端的并行性,因为执行 TorchScript 函数不需要 GIL。 ```py torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0) ``` 在工作节点`to`上运行函数`func`的阻塞 RPC 调用。RPC 消息在执行 Python 代码的同时并行发送和接收。此方法是线程安全的。 参数 + **to** ([*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)") *或* *WorkerInfo* *或* [*int*](https://docs.python.org/3/library/functions.html#int "(in Python v3.12)")) – 目标工作节点的名称/等级/`WorkerInfo`。 + **func** (*Callable*) – 一个可调用函数,例如 Python 可调用函数,内置运算符(例如`add()`)和带注释的 TorchScript 函数。 + **args** ([*tuple*](https://docs.python.org/3/library/stdtypes.html#tuple "(in Python v3.12)")) – 用于`func`调用的参数元组。 + **kwargs** ([*dict*](https://docs.python.org/3/library/stdtypes.html#dict "(in Python v3.12)")) – 是`func`调用的关键字参数字典。 + **timeout** ([*float*](https://docs.python.org/3/library/functions.html#float "(in Python v3.12)")*,* *可选*) – 用于此 RPC 的超时时间(以秒为单位)。如果 RPC 在此时间内未完成,将引发指示已超时的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用在初始化期间或使用`_set_rpc_timeout`设置的默认值。 返回 返回运行`func`与`args`和`kwargs`的结果。 示例:: 确保在两个工作节点上正确设置`MASTER_ADDR`和`MASTER_PORT`。有关更多详细信息,请参考`init_process_group()` API。例如, export MASTER_ADDR=localhost export MASTER_PORT=5678 然后在两个不同的进程中运行以下代码: ```py >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown() ``` ```py >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` 以下是使用 RPC 运行 TorchScript 函数的示例。 ```py >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) ``` ```py >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown() ``` ```py >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` ```py torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0) ``` 在工作节点`to`上运行函数`func`的非阻塞 RPC 调用。RPC 消息在执行 Python 代码的同时并行发送和接收。此方法是线程安全的。此方法将立即返回一个可以等待的`Future`。 参数 + **to** ([*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)") *或* *WorkerInfo* *或* [*int*](https://docs.python.org/3/library/functions.html#int "(in Python v3.12)")) – 目标工作节点的名称/等级/`WorkerInfo`。 + **func** (*Callable*) – 一个可调用函数,例如 Python 可调用函数,内置运算符(例如`add()`)和带注释的 TorchScript 函数。 + **args** ([*tuple*](https://docs.python.org/3/library/stdtypes.html#tuple "(in Python v3.12)")) – 用于`func`调用的参数元组。 + **kwargs** ([*dict*](https://docs.python.org/3/library/stdtypes.html#dict "(in Python v3.12)")) – 是`func`调用的关键字参数字典。 + **timeout** ([*float*](https://docs.python.org/3/library/functions.html#float "(in Python v3.12)")*,* *可选*) – 用于此 RPC 的超时时间(以秒为单位)。如果 RPC 在此时间内未完成,将引发指示已超时的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用初始化期间设置的默认值或使用 `_set_rpc_timeout` 设置的默认值。 返回 返回一个 `Future` 对象,可以等待。完成后,可以从 `Future` 对象中检索 `func` 在 `args` 和 `kwargs` 上的返回值。 警告 不支持将 GPU 张量用作 `func` 的参数或返回值,因为我们不支持通过网络发送 GPU 张量。在将 GPU 张量用作 `func` 的参数或返回值之前,您需要显式将 GPU 张量复制到 CPU。 警告 `rpc_async` API 在将参数张量通过网络发送之前不会复制存储,这可能由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的内容保持不变,直到返回的 `Future` 完成。 示例:: 确保在两个工作节点上正确设置 `MASTER_ADDR` 和 `MASTER_PORT`。有关更多详细信息,请参考 `init_process_group()` API。例如, 导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678 然后在两个不同的进程中运行以下代码: ```py >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown() ``` ```py >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` 以下是使用 RPC 运行 TorchScript 函数的示例。 ```py >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) ``` ```py >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown() ``` ```py >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` ```py torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0) ``` 在工作节点 `to` 上运行 `func` 并立即返回结果值的 `RRef`。工作节点 `to` 将是返回的 `RRef` 的所有者,调用 `remote` 的工作节点是用户。所有者管理其 `RRef` 的全局引用计数,只有当全局没有对其的活动引用时,所有者的 `RRef` 才会被销毁。 参数 + **to** ([*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)") *或* *WorkerInfo* *或* [*int*](https://docs.python.org/3/library/functions.html#int "(in Python v3.12)")) – 目标工作节点的名称/等级/`WorkerInfo`。 + **func** (*Callable*) – 可调用函数,例如 Python 可调用函数、内置运算符(例如 `add()`)和带注释的 TorchScript 函数。 + **args** ([*tuple*](https://docs.python.org/3/library/stdtypes.html#tuple "(in Python v3.12)")) – 用于 `func` 调用的参数元组。 + **kwargs** ([*dict*](https://docs.python.org/3/library/stdtypes.html#dict "(in Python v3.12)")) – 是 `func` 调用的关键字参数字典。 + **timeout** ([*float*](https://docs.python.org/3/library/functions.html#float "(in Python v3.12)")*,* *可选*) – 此远程调用的超时时间(以秒为单位)。如果在此超时时间内在此工作节点上未成功处理对工作节点 `to` 上的此 `RRef` 的创建,则下次尝试使用 RRef(例如 `to_here()`)时,将引发超时,指示此失败。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用初始化期间设置的默认值或使用 `_set_rpc_timeout` 设置的默认值。 返回 用户 `RRef` 实例的结果值。使用阻塞 API `torch.distributed.rpc.RRef.to_here()` 在本地检索结果值。 警告 `remote` API 在将参数张量发送到远程时不会复制存储,这可能由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的内容保持不变,直到所有者确认返回的 RRef,可以使用`torch.distributed.rpc.RRef.confirmed_by_owner()` API 进行检查。 警告 `remote` API 的超时等错误是尽力处理的。这意味着当由`remote`发起的远程调用失败时,比如超时错误,我们会采取尽力处理错误的方法。这意味着错误会异步处理并设置在结果的 RRef 上。如果在此处理之前应用程序未使用 RRef(例如`to_here`或 fork 调用),则将适当引发`RRef`的错误。但是,用户应用程序可能在处理错误之前使用`RRef`。在这种情况下,错误可能不会被引发,因为它们尚未被处理。 示例: ```py Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` ```py torch.distributed.rpc.get_worker_info(worker_name=None) ``` 获取给定工作进程名称的`WorkerInfo`。使用此`WorkerInfo`以避免在每次调用时传递昂贵的字符串。 参数 **worker_name**([*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)")) - 工作进程的字符串名称。如果为`None`,则返回当前工作进程的 ID。(默认为`None`) 返回 给定`worker_name`的`WorkerInfo`实例或当前工作进程的`WorkerInfo`如果`worker_name`为`None`。 ```py torch.distributed.rpc.shutdown(graceful=True, timeout=0) ``` 执行 RPC 代理的关闭,然后销毁 RPC 代理。这将停止本地代理接受未完成的请求,并通过终止所有 RPC 线程关闭 RPC 框架。如果`graceful=True`,这将阻塞,直到所有本地和远程 RPC 进程到达此方法并等待所有未完成的工作完成。否则,如果`graceful=False`,这是一个本地关闭,不会等待其他 RPC 进程到达此方法。 警告 对于由`rpc_async()`返回的`Future`对象,在`shutdown()`之后不应调用`future.wait()`。 参数 **graceful**([*bool*](https://docs.python.org/3/library/functions.html#bool "(in Python v3.12)")) - 是否进行优雅关闭。如果为 True,这将 1)等待`UserRRefs`没有挂起的系统消息并删除它们;2)阻塞,直到所有本地和远程 RPC 进程到达此方法并等待所有未完成的工作完成。 示例:: 确保在两个工作进程上正确设置`MASTER_ADDR`和`MASTER_PORT`。有关更多详细信息,请参考`init_process_group()` API。例如, export MASTER_ADDR=localhost export MASTER_PORT=5678 然后在两个不同的进程中运行以下代码: ```py >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown() ``` ```py >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown() ``` ```py class torch.distributed.rpc.WorkerInfo ``` 封装系统中工作进程信息的结构。包含工作进程的名称和 ID。此类不应直接构造,而是可以通过`get_worker_info()`检索实例,并将结果传递给函数,如`rpc_sync()`、`rpc_async()`、`remote()`以避免在每次调用时复制字符串。 ```py property id ``` 用于标识工作进程的全局唯一 ID。 ```py property name ``` 工作进程的名称。 RPC 包还提供了装饰器,允许应用程序指定在被调用方如何处理给定函数。 ```py torch.distributed.rpc.functions.async_execution(fn) ``` 一个函数的装饰器,指示函数的返回值保证是一个`Future`对象,并且此函数可以在 RPC 被调用方异步运行。更具体地说,被调用方提取由包装函数返回的`Future`并将后续处理步骤安装为该`Future`的回调。安装的回调将在完成时从`Future`中读取值并将该值作为 RPC 响应发送回去。这也意味着返回的`Future`仅存在于被调用方,并且永远不会通过 RPC 发送。当包装函数的执行需要暂停和恢复时,例如包含`rpc_async()`或等待其他信号时,此装饰器非常有用。 注意 为了启用异步执行,应用程序必须将此装饰器返回的函数对象传递给 RPC API。如果 RPC 检测到此装饰器安装的属性,它会知道此函数返回一个`Future`对象,并相应处理。但是,这并不意味着在定义函数时此装饰器必须是最外层的。例如,与`@staticmethod`或`@classmethod`结合时,`@rpc.functions.async_execution`需要是内部装饰器,以允许目标函数被识别为静态或类函数。这个目标函数仍然可以异步执行,因为当访问时,静态或类方法保留了由`@rpc.functions.async_execution`安装的属性。 示例:: 返回的`Future`对象可以来自`rpc_async()`、`then()`或`Future`构造函数。下面的示例展示了直接使用由`then()`返回的`Future`。 ```py >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.]) ``` 与 TorchScript 装饰器结合时,此装饰器必须是最外层的。 ```py >>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.]) ``` 与静态或类方法结合时,此装饰器必须是内部装饰器。 ```py >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) ``` 此装饰器还与 RRef 助手一起使用,即。`torch.distributed.rpc.RRef.rpc_sync()`、`torch.distributed.rpc.RRef.rpc_async()`和`torch.distributed.rpc.RRef.remote()`。 ```py >>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.]) ``` ### 后端 RPC 模块可以利用不同的后端来执行节点之间的通信。要使用的后端可以在`init_rpc()`函数中指定,通过传递`BackendType`枚举的某个值。无论使用什么后端,RPC API 的其余部分都不会改变。每个后端还定义了自己的`RpcBackendOptions`类的子类,该类的实例也可以传递给`init_rpc()`以配置后端的行为。 ```py class torch.distributed.rpc.BackendType(value) ``` 可用后端的枚举类。 PyTorch 内置了`BackendType.TENSORPIPE`后端。可以使用`register_backend()`函数注册其他后端。 ```py class torch.distributed.rpc.RpcBackendOptions ``` 封装传递给 RPC 后端的选项的抽象结构。可以将此类的实例传递给`init_rpc()`以使用特定配置初始化 RPC,例如 RPC 超时和要使用的`init_method`。 ```py property init_method ``` 指定如何初始化进程组的 URL。默认为`env://`。 ```py property rpc_timeout ``` 一个浮点数,表示用于所有 RPC 的超时时间。如果一个 RPC 在这个时间段内没有完成,它将以超时的异常完成。 #### TensorPipe 后端 TensorPipe 代理是默认的,利用了[the TensorPipe library](https://github.com/pytorch/tensorpipe),它提供了一种专门适用于机器学习的本地点对点通信原语,从根本上解决了 Gloo 的一些限制。与 Gloo 相比,它的优势在于是异步的,这允许大量的传输同时进行,每个传输以自己的速度进行,而不会相互阻塞。它只在需要时按需在节点对之间打开管道,当一个节点失败时,只有它的相关管道将被关闭,而所有其他管道将继续正常工作。此外,它能够支持多种不同的传输方式(TCP,当然,还有共享内存,NVLink,InfiniBand 等),并且可以自动检测它们的可用性并协商用于每个管道的最佳传输方式。 TensorPipe 后端已经在 PyTorch v1.6 中引入,并正在积极开发中。目前,它仅支持 CPU 张量,GPU 支持即将到来。它配备了基于 TCP 的传输,就像 Gloo 一样。它还能够自动对大张量进行分块和多路复用,以实现非常高的带宽。代理将能够自行选择最佳传输方式,无需干预。 示例: ```py >>> import os >>> from torch.distributed import rpc >>> os.environ['MASTER_ADDR'] = 'localhost' >>> os.environ['MASTER_PORT'] = '29500' >>> >>> rpc.init_rpc( >>> "worker1", >>> rank=0, >>> world_size=2, >>> rpc_backend_options=rpc.TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> rpc_timeout=20 # 20 second timeout >>> ) >>> ) >>> >>> # omitting init_rpc invocation on worker2 ``` ```py class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None) ``` `TensorPipeAgent`的后端选项,派生自`RpcBackendOptions`。 参数 + **num_worker_threads**([*int*](https://docs.python.org/3/library/functions.html#int "(in Python v3.12)")*,* *可选*) – `TensorPipeAgent`用于执行请求的线程池中的线程数(默认值:16)。 + **rpc_timeout**([*float*](https://docs.python.org/3/library/functions.html#float "(in Python v3.12)")*,* *可选*) – RPC 请求的默认超时时间,以秒为单位(默认值:60 秒)。如果 RPC 在此时间段内未完成,将引发指示的异常。调用者可以在需要时在`rpc_sync()`和`rpc_async()`中为单独的 RPC 覆盖此超时。 + **init_method**([*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)")*,* *可选*) – 用于初始化用于会合的分布式存储的 URL。它接受与`init_process_group()`的相同参数的任何值(默认值:`env://`)。 + **device_maps**(*Dict**[*[*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)")*,* *Dict**]**,* *可选*) – 从此工作器到被调用者的设备放置映射。键是被调用者工作器名称,值是字典(`Dict` of `int`, `str`, or `torch.device`),将此工作器的设备映射到被调用者的设备。(默认值:`None`) + **devices**(List[int, str, or `torch.device`],可选)– RPC 代理使用的所有本地 CUDA 设备。默认情况下,它将被初始化为来自其自身`device_maps`和对等方`device_maps`的所有本地设备。在处理 CUDA RPC 请求时,代理将为此`List`中的所有设备正确同步 CUDA 流。 ```py property device_maps ``` 设备映射位置。 ```py property devices ``` 本地代理使用的所有设备。 ```py property init_method ``` 指定如何初始化进程组的 URL。默认为`env://` ```py property num_worker_threads ``` `TensorPipeAgent`用于执行请求的线程池中的线程数。 ```py property rpc_timeout ``` 指示用于所有 RPC 的超时的浮点数。如果 RPC 在此时间段内未完成,它将以超时的异常完成。 ```py set_device_map(to, device_map) ``` 设置每个 RPC 调用者和被调用者对之间的设备映射。此函数可以多次调用以逐步添加设备放置配置。 参数 + **to**([*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)"))- 被调用者名称。 + **device_map**(*Dict* *of* [*int*](https://docs.python.org/3/library/functions.html#int "(in Python v3.12)")*,* [*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)")*, 或* *torch.device*)- 从此工作器到被调用者的设备放置映射。此映射必须是可逆的。 示例 ```py >>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1\. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1') ``` ```py set_devices(devices) ``` 设置 TensorPipe RPC 代理使用的本地设备。在处理 CUDA RPC 请求时,TensorPipe RPC 代理将为此`List`中所有设备适当同步 CUDA 流。 参数 **devices**(*List* *of* [*int*](https://docs.python.org/3/library/functions.html#int "(in Python v3.12)")*,* [*str*](https://docs.python.org/3/library/stdtypes.html#str "(in Python v3.12)")*, 或* *torch.device*)- TensorPipe RPC 代理使用的本地设备。 注意 RPC 框架不会自动重试任何`rpc_sync()`、`rpc_async()`和`remote()`调用。原因是 RPC 框架无法确定操作是否幂等,以及是否安全重试。因此,应用程序有责任处理失败并在必要时重试。RPC 通信基于 TCP,因此可能由于网络故障或间歇性网络连接问题而发生故障。在这种情况下,应用程序需要适当地重试,以合理的退避时间确保网络不会被过于激进的重试所压倒。## RRef 警告 在使用 CUDA 张量时,目前不支持 RRefs `RRef`(远程引用)是对远程工作器上某种类型`T`(例如`Tensor`)的值的引用。此句柄在所有者上保持引用的远程值保持活动状态,但并不意味着该值将来会传输到本地工作器。RRefs 可以通过持有对其他工作器上存在的[nn.Modules](https://pytorch.org/docs/stable/nn.html#torch.nn.Module)的引用,在多机训练中使用,并调用适当的函数在训练期间检索或修改它们的参数。有关更多详细信息,请参见远程引用协议。 ```py class torch.distributed.rpc.PyRRef(RRef) ``` 封装对远程工作器上某种类型值的引用的类。此句柄将保持工作器上引用的远程值保持活动状态。当`UserRRef`被删除时,1)在应用程序代码和本地 RRef 上没有对它的引用,或 2)应用程序已调用了优雅关闭时,`UserRRef`将被删除。在已删除的 RRef 上调用方法会导致未定义的行为。RRef 实现仅提供尽力检测错误,应用程序不应在`rpc.shutdown()`之后使用`UserRRefs`。 警告 RRefs 只能由 RPC 模块序列化和反序列化。在没有 RPC 的情况下序列化和反序列化 RRefs(例如,Python pickle,torch `save()` / `load()`,JIT `save()` / `load()`等)会导致错误。 参数 + **value**([*object*](https://docs.python.org/3/library/functions.html#object "(in Python v3.12)"))- 要由此 RRef 包装的值。 + **type_hint**(*Type**,* *可选*)- 应传递给`TorchScript`编译器作为`value`的类型提示的 Python 类型。 示例: 出于简单起见,以下示例跳过了 RPC 初始化和关闭代码。有关详细信息,请参阅 RPC 文档。 1. 使用 rpc.remote 创建一个 RRef ```py >>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here() ``` 1. 从本地对象创建一个 RRef ```py >>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x) ``` 1. 与其他工作人员共享一个 RRef ```py >>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1 ``` ```py >>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,)) ``` ```py backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) → None ``` > 使用 RRef 作为反向传递的根运行反向传递。如果提供了`dist_autograd_ctx_id`,我们将使用提供的 ctx_id 从 RRef 的所有者开始执行分布式反向传递。在这种情况下,应使用`get_gradients()`来检索梯度。如果`dist_autograd_ctx_id`为`None`,则假定这是一个本地自动梯度图,我们只执行本地反向传递。在本地情况下,调用此 API 的节点必须是 RRef 的所有者。预期 RRef 的值是标量张量。 参数 + **dist_autograd_ctx_id**([*int*](https://docs.python.org/3/library/functions.html#int "(in Python v3.12)")*,* *可选*)- 我们应该检索梯度的分布式自动梯度上下文 id(默认值:-1)。 + **retain_graph**([*bool*](https://docs.python.org/3/library/functions.html#bool "(in Python v3.12)")*,* *可选*)- 如果为`False`,用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为`True`是不需要的,并且通常可以以更高效的方式解决。通常,您需要将其设置为`True`以多次运行反向(默认值:False)。 示例: ```py >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id) ``` ```py confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) → bool ``` 返回此`RRef`是否已被所有者确认。`OwnerRRef`始终返回 true,而`UserRRef`仅在所有者知道此`UserRRef`时返回 true。 ```py is_owner(self: torch._C._distributed_rpc.PyRRef) → bool ``` 返回当前节点是否是此`RRef`的所有者。 ```py local_value(self: torch._C._distributed_rpc.PyRRef) → object ``` 如果当前节点是所有者,则返回对本地值的引用。否则,抛出异常。 ```py owner(self: torch._C._distributed_rpc.PyRRef) → torch._C._distributed_rpc.WorkerInfo ``` 返回拥有此`RRef`的节点的工作人员信息。 ```py owner_name(self: torch._C._distributed_rpc.PyRRef) → str ``` 返回拥有此`RRef`的节点的工作人员名称。 ```py remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) → object ``` 创建一个辅助代理,以便使用拥有 RRef 的所有者作为目标轻松启动`remote`,以在此 RRef 引用的对象上运行函数。更具体地说,`rref.remote().func_name(*args, **kwargs)`等同于以下内容: ```py >>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs)) ``` 参数 **timeout**([*float*](https://docs.python.org/3/library/functions.html#float "(in Python v3.12)")*,* *可选*)- `rref.remote()`的超时时间。如果在超时时间内未成功完成此`RRef`的创建,则下次尝试使用 RRef(例如`to_here`)时将引发超时。如果未提供,将使用默认的 RPC 超时时间。有关`RRef`的特定超时语义,请参见`rpc.remote()`。 示例: ```py >>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]]) ``` ```py rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) → object ``` 创建一个辅助代理,以便使用拥有 RRef 的所有者作为目标轻松启动`rpc_async`,以在此 RRef 引用的对象上运行函数。更具体地说,`rref.rpc_async().func_name(*args, **kwargs)`等同于以下内容: ```py >>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs)) ``` 参数 **timeout**([*float*](https://docs.python.org/3/library/functions.html#float "(in Python v3.12)")*,* *可选*)- `rref.rpc_async()`的超时时间。如果在此时间范围内调用未完成,将引发指示的异常。如果未提供此参数,则将使用默认的 RPC 超时时间。 示例:: ```py >>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]]) ``` ```py rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) → object ``` 创建一个辅助代理,以便使用 RRef 的所有者轻松启动`rpc_sync`,以运行此 RRef 引用的对象上的函数。更具体地说,`rref.rpc_sync().func_name(*args, **kwargs)`等同于以下内容: ```py >>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs)) ``` 参数 **timeout** ([*float*](https://docs.python.org/3/library/functions.html#float "(在 Python v3.12 中)")*,* *optional*) – `rref.rpc_sync()`的超时时间。如果调用在此时间范围内未完成,将引发指示的异常。如果未提供此参数,则将使用默认的 RPC 超时时间。 示例:: ```py >>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]]) ``` ```py to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) → object ``` 阻塞调用,将 RRef 的值从所有者复制到本地节点并返回。如果当前节点是所有者,则返回对本地值的引用。 参数 **timeout** ([*float*](https://docs.python.org/3/library/functions.html#float "(在 Python v3.12 中)")*,* *optional*) – `to_here`的超时时间。如果调用在此时间范围内未完成,将引发指示的异常。如果未提供此参数,则将使用默认的 RPC 超时时间(60 秒)。 有关 RRef 的更多信息 + 远程引用协议 + 背景 + 假设 + RRef 生命周期 + 设计原理 + 实现 + 协议场景 + 用户与所有者共享 RRef 作为返回值 + 用户与所有者共享 RRef 作为参数 + 所有者与用户共享 RRef + 用户与用户共享 RRef ## RemoteModule[](#remotemodule "Permalink to this heading") 警告 当前不支持使用 CUDA 张量时,RemoteModule `RemoteModule`是在不同进程上轻松创建 nn.Module 的一种方式。实际模块驻留在远程主机上,但本地主机具有对此模块的句柄,并且可以像常规的 nn.Module 一样调用此模块。但是,调用会导致 RPC 调用到远程端,并且如果需要,可以通过 RemoteModule 支持的其他 API 以异步方式执行。 ```py class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs) ``` > 只有在 RPC 初始化后才能创建 RemoteModule 实例。 > > 它在指定的远程节点上创建一个用户指定的模块。它的行为类似于常规的`nn.Module`,只是`forward`方法在远程节点上执行。它负责自动求导记录,以确保反向传播梯度传播回相应的远程模块。 > > 它根据`module_cls`的`forward`方法的签名生成两个方法`forward_async`和`forward`。`forward_async`以异步方式运行并返回一个 Future。`forward_async`和`forward`的参数与由`module_cls`返回的模块的`forward`方法相同。 > > 例如,如果`module_cls`返回`nn.Linear`的实例,该实例具有`forward`方法签名:`def forward(input: Tensor) -> Tensor:`,生成的`RemoteModule`将具有 2 个带有签名的方法: > > `def forward(input: Tensor) -> Tensor:``def forward_async(input: Tensor) -> Future[Tensor]:` 参数 + **remote_device** ([*str*](https://docs.python.org/3/library/stdtypes.html#str "(在 Python v3.12 中)")) – 我们希望将此模块放置在目标工作节点上的设备。格式应为“/”,其中设备字段可以解析为 torch.device 类型。例如,“trainer0/cpu”,“trainer0”,“ps0/cuda:0”。此外,设备字段可以是可选的,默认值为“cpu”。 + **module_cls** (*nn.Module*) – 用于在远程创建的模块的类。例如, ```py >>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule ``` + **args** (*Sequence**,* *optional*) – 传递给`module_cls`的参数。 + **kwargs** (*Dict**,* *optional*) – 传递给`module_cls`的关键字参数。 返回 一个远程模块实例,包装了用户提供的 `module_cls` 创建的 `Module`,它具有阻塞的 `forward` 方法和一个异步的 `forward_async` 方法,返回用户提供模块在远程端的 `forward` 调用的 future。 示例:: 在两个不同的进程中运行以下代码: ```py >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown() ``` ```py >>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` 此外,可以在[分布式数据并行](https://pytorch.org/docs/stable/nn.html#torch.nn.parallel.DistributedDataParallel)(DDP)中结合更实际的示例,详见此[教程](https://pytorch.org/tutorials/advanced/rpc_ddp_tutorial.html)。 ```py get_module_rref() ``` 返回一个指向远程模块的 `RRef`(`RRef[nn.Module]`)。 返回类型 *RRef*[*Module*] ```py remote_parameters(recurse=True) ``` 返回一个指向远程模块参数的 `RRef` 列表。 这通常可以与 `DistributedOptimizer` 结合使用。 参数 **recurse**([*bool*](https://docs.python.org/3/library/functions.html#bool "(在 Python v3.12 中)"))- 如果为 True,则返回远程模块及其所有子模块的参数。否则,仅返回远程模块的直接成员参数。 返回 一个远程模块参数的 `RRef`(`List[RRef[nn.Parameter]]`)列表。 返回类型 [*List*](https://docs.python.org/3/library/typing.html#typing.List "(在 Python v3.12 中)")*RRef*[[*Parameter*]] ## 分布式自动求导框架[](#distributed-autograd-framework "跳转到此标题") 警告 当使用 CUDA 张量时,不支持分布式自动求导 这个模块提供了一个基于 RPC 的分布式自动求导框架,可用于诸如模型并行训练等应用。简而言之,应用程序可以通过 RPC 发送和接收梯度记录张量。在前向传播中,当梯度记录张量通过 RPC 发送时,我们记录下来;在反向传播过程中,我们使用这些信息来使用 RPC 执行分布式反向传播。更多细节请参见分布式自动求导设计。 ```py torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) → None ``` 使用提供的根启动分布式反向传播。目前实现了 FAST mode 算法,该算法假定在同一分布式自动求导上下文中的所有 RPC 消息在反向传播过程中都将成为自动求导图的一部分。 我们使用提供的根来发现自动求导图并计算适当的依赖关系。此方法会阻塞,直到整个自动求导计算完成。 我们在每个节点的适当 `torch.distributed.autograd.context` 中累积梯度。要使用的自动求导上下文是根据传入的 `context_id` 查找的,当调用 `torch.distributed.autograd.backward()` 时传入。如果没有与给定 ID 对应的有效自动求导上下文,我们会抛出错误。您可以使用 `get_gradients()` API 检索累积的梯度。 参数 + **context_id**([*int*](https://docs.python.org/3/library/functions.html#int "(在 Python v3.12 中)"))- 我们应该检索梯度的自动求导上下文 id。 + **roots**([*list*](https://docs.python.org/3/library/stdtypes.html#list "(在 Python v3.12 中)"))- 代表自动求导计算的根的张量。所有张量应为标量。 + **retain_graph**(*bool*,可选) - 如果为 False,则用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为 True 是不需要的,并且通常可以以更高效的方式解决。通常,您需要将其设置为 True 以多次运行反向传递。 示例:: ```py >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss) ``` ```py class torch.distributed.autograd.context ``` 在使用分布式自动求导时,用于包装前向和后向传递的上下文对象。在`with`语句中生成的`context_id`用于唯一标识所有工作进程上的分布式后向传递。每个工作进程存储与此`context_id`相关的元数据,这是正确执行分布式自动求导传递所必需的。 示例:: ```py >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss]) ``` ```py torch.distributed.autograd.get_gradients(context_id: int) → Dict[Tensor, Tensor] ``` 从张量到在分布式自动求导后向传递的提供的上下文中累积的该张量的适当梯度的映射中检索映射,对应于给定的`context_id`。 参数 **context_id**(*int*) - 我们应该检索与给定“context_id”对应的上下文中累积的该张量的适当梯度的张量映射。 返回 一个映射,其中键是张量,值是该张量的相关梯度。 示例:: ```py >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2]) ``` 有关 RPC 自动求导的更多信息 + 分布式自动求导设计 + 背景 + 前向传递期间的自动求导记录 + 分布式自动求导上下文 + 分布式后向传递 + 计算依赖关系 + FAST 模式算法 + SMART 模式算法 + 分布式优化器 + 简单的端到端示例 ## 分布式优化器 请参阅[torch.distributed.optim](https://pytorch.org/docs/main/distributed.optim.html)页面,了解有关分布式优化器的文档。 ## 设计说明 分布式自动求导设计说明涵盖了基于 RPC 的分布式自动求导框架的设计,对于模型并行训练等应用非常有用。 + 分布式自动求导设计 RRef 设计说明涵盖了 RRef(远程引用)协议的设计,该协议用于通过框架引用远程工作进程上的值。 + 远程引用协议 ## 教程 RPC 教程介绍了用户如何使用 RPC 框架,提供了几个示例应用程序,使用 torch.distributed.rpc API,并演示如何使用分析器来分析基于 RPC 的工作负载。 + 使用分布式 RPC 框架入门 + [使用分布式 RPC 框架实现参数服务器](https://pytorch.org/tutorials/intermediate/rpc_param_server_tutorial.html) + [将分布式数据并行与分布式 RPC 框架结合使用](https://pytorch.org/tutorials/advanced/rpc_ddp_tutorial.html)(涵盖**RemoteModule**) + [基于 RPC 的工作负载分析](https://pytorch.org/tutorials/recipes/distributed_rpc_profiling.html) + [实现批处理 RPC 处理](https://pytorch.org/tutorials/intermediate/rpc_async_execution.html) + [分布式管道并行](https://pytorch.org/tutorials/intermediate/dist_pipeline_parallel_tutorial.html)