# 分布式 RPC 框架 > 原文: [https://pytorch.org/docs/stable/rpc.html](https://pytorch.org/docs/stable/rpc.html) 分布式 RPC 框架通过一组原语提供了用于多机器模型训练的机制,以允许进行远程通信;还提供了高级 API,以自动区分在多台机器之间划分的模型。 警告 RPC API 是试验性的,随时可能更改。 ## RPC 和 RRef 框架 在使用 RPC 和分布式 autograd 原语之前,必须进行初始化。 要初始化 RPC 框架,我们需要使用 [`init_rpc()`](#torch.distributed.rpc.init_rpc "torch.distributed.rpc.init_rpc") 来初始化 RPC 框架,RRef 框架和分布式 autograd。 默认情况下,这还将初始化 <cite>ProcessGroup</cite> ([`init_process_group()`](distributed.html#torch.distributed.init_process_group "torch.distributed.init_process_group"))后端,以进行 RPC 通信。 <cite>ProcessGroup</cite> 后端在内部使用 gloo 进行通信。 * * * ``` torch.distributed.rpc.init_rpc(name, backend=BackendType.PROCESS_GROUP, rank=-1, world_size=None, rpc_backend_options=None) ``` 初始化 RPC 原语,例如本地 RPC 代理和分布式 autograd。 初始化本地 RPC 代理,该代理立即使当前进程准备好发送和接收 RPC。 此方法还可以正确初始化使用 gloo 进行集体通信的默认进程组后端。 参数 * **后端**(_枚举_)– RPC 后端实现的类型。 当前,进程组后端是唯一可用的后端实现。 (默认:`RpcBackend.PROCESS_GROUP`)。 * **名称** (_str_ )–此节点的全局唯一名称。 (例如`Trainer3`,`ParameterServer2`,`Master`和`Worker1`)名称只能包含数字,字母,下划线和/或破折号,并且必须少于 128 个字符。 * **等级** (_python:int_ )–此节点的全局唯一 ID /等级。 * **world_size** (_python:int_ )–组中的工人数。 * **rpc_backend_options** (_RpcBackendOptions_ )–传递给 RpcAgent 构造函数的选项。 ## 参考 <cite>RRef</cite> (远程引用)是对远程工作人员上某个类型 <cite>T</cite> (例如<cite>张量</cite>)的值的引用。 该句柄使引用的远程值在所有者上保持活动状态,但不暗示该值将来会转移给本地工作人员。 通过保留对其他工作人员中存在的 [nn.Modules](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) 的引用,并在训练期间调用适当的函数来检索或修改其参数,可以将 RRef 用于多机训练。 有关更多详细信息,请参见[远程参考协议](notes/rref.html#remote-reference-protocol)。 * * * ``` class torch.distributed.rpc.RRef ``` 在远程工作器上封装对某个类型的值的引用的类。 该句柄将使引用的远程值在工作程序上保持活动状态。 * * * ``` is_owner(self: torch.distributed.rpc.RRef) → bool ``` 返回当前节点是否是此`RRef`的所有者。 * * * ``` local_value(self: torch.distributed.rpc.RRef) → object ``` 如果当前节点是所有者,则返回对本地值的引用。 否则,引发异常。 * * * ``` owner(self: torch.distributed.rpc.RRef) → torch.distributed.rpc.WorkerInfo ``` 返回拥有此`RRef`的节点的工作程序信息。 * * * ``` to_here(self: torch.distributed.rpc.RRef) → object ``` 将 RRef 的值从所有者复制到本地节点并返回它的阻塞调用。 如果当前节点是所有者,则返回对本地值的引用。 ## RPC 和 RRef 原语 该库提供了原语,允许用户创建和修改对远程数据的引用(RRef)以及远程执行功能。 * * * ``` torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None) ``` 进行 RPC 阻塞调用以在 worker `to`上运行函数`func`。 RPC 消息的发送和接收与 Python 代码的执行并行。 此方法是线程安全的。 Parameters * **到** (_str_ _或_ _WorkerInfo_ )–目标工作线程的 ID 或名称。 * **函数**(可调用_的_)–任何可调用的函数。 内置函数(例如 [`torch.add()`](torch.html#torch.add "torch.add"))可以更有效地通过 RPC 发送。 * **args** (_元组_)– `func`调用的参数元组。 * **kwargs** (_dict_ )–是`func`调用的关键字参数的字典。 退货 返回在`args`和`kwargs`上运行`func`的结果。 例: ``` On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown() On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` * * * ``` torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None) ``` 进行非阻塞 RPC 调用以在 worker `to`上运行函数`func`。 RPC 消息的发送和接收与 Python 代码的执行并行。 此方法是线程安全的。 此方法将立即返回可以等待的`torch.distributed.FutureMessage`。 Parameters * **to** (_str_ _or_ _WorkerInfo_) – id or name of the destination worker. * **func** (_callable_) – any callable function. builtin functions (like [`torch.add()`](torch.html#torch.add "torch.add")) can be sent over RPC more efficiently. * **args** (_tuple_) – the argument tuple for the `func` invocation. * **kwargs** (_dict_) – is a dictionary of keyword arguments for the `func` invocation. Returns 返回可以等待的`torch.distributed.FutureMessage`对象。 完成后,可以从`FutureMessage`对象中检索`args`和`kwargs`上`func`的返回值。 Example: ``` On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown() On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` * * * ``` torch.distributed.rpc.remote(to, func, args=None, kwargs=None) ``` 进行远程调用以在工作线程`to`上运行`func`,并立即将 [`RRef`](#torch.distributed.rpc.RRef "torch.distributed.rpc.RRef") 返回到结果值。 工人`to`将是返回的 [`RRef`](#torch.distributed.rpc.RRef "torch.distributed.rpc.RRef") 的所有者,而调用`remote`的工人是用户。 所有者管理其 [`RRef`](#torch.distributed.rpc.RRef "torch.distributed.rpc.RRef") 的全局引用计数,而所有者 [`RRef`](#torch.distributed.rpc.RRef "torch.distributed.rpc.RRef") 仅在全局上没有活动引用时被销毁。 Parameters * **to** (_str_ _or_ _WorkerInfo_) – id or name of the destination worker. * **函数**(可调用_的_)–内置函数(例如 [`torch.add()`](torch.html#torch.add "torch.add"))。 * **args** (_tuple_) – the argument tuple for the `func` invocation. * **kwargs** (_dict_) – is a dictionary of keyword arguments for the `func` invocation. Returns 用户 [`RRef`](#torch.distributed.rpc.RRef "torch.distributed.rpc.RRef") 实例到结果值。 使用阻塞 API [`torch.distributed.rpc.RRef.to_here()`](#torch.distributed.rpc.RRef.to_here "torch.distributed.rpc.RRef.to_here") 在本地检索结果值。 Example: ``` On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() ``` * * * ``` torch.distributed.rpc.get_worker_info(worker_name=None) ``` 获取给定工人名称的`WorkerInfo`。 使用此`WorkerInfo`可以避免在每次调用时传递昂贵的字符串。 Parameters **worker_name** (_str_ )–工人的字符串名称。 如果`None`,则返回当前工作程序的 ID。 (默认`None`) Returns 如果`worker_name`为`None`,则给定当前工作程序的`worker_name`或`WorkerInfo`的`WorkerInfo`实例。 * * * ``` torch.distributed.rpc.shutdown(graceful=True) ``` 关闭 RPC 代理,然后销毁 RPC 代理。 这将阻止本地代理接受未完成的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。 如果 graceful = True,则它将阻塞,直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。 否则,如果 graceful = False,则这是本地关闭,并且它不等待其他 RPC 进程到达此方法。 Parameters **正常** (_bool_ )–是否进行正常关机。 如果为 True,它将阻塞直到所有本地和远程 RPC 进程都达到此方法并等待所有未完成的工作完成。 Example: ``` On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown() On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown() ``` ## 分布式 Autograd 框架 此模块提供了一个基于 RPC 的分布式 autograd 框架,该框架可用于模型并行训练等应用程序。 简而言之,应用程序可以通过 RPC 发送和接收梯度记录张量。 在前向传递中,我们记录何时通过 RPC 发送梯度记录张量,而在后向传递过程中,我们使用此信息使用 RPC 执行分布式后向传递。 有关更多详细信息,请参见[分布式 Autograd 设计](notes/distributed_autograd.html#distributed-autograd-design)。 * * * ``` class torch.distributed.autograd.context ``` 使用分布式 autograd 时要环绕前进和后退传递的上下文对象。 需要`with`语句中生成的`context_id`来唯一标识所有工作程序上的分布式反向传递。 每个工作人员都存储与此`context_id`关联的元数据,这是正确执行分布式自动求导证件所必需的。 Example: ``` >> import torch.distributed.autograd as dist_autograd >> with dist_autograd.context() as context_id: >> t1 = torch.rand((3, 3), requires_grad=True) >> t2 = torch.rand((3, 3), requires_grad=True) >> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >> dist_autograd.backward([loss]) ``` * * * ``` torch.distributed.autograd.backward(roots: List[Tensor]) → None ``` 使用提供的根启动分布式反向传递。 当前,这实现了 [FAST 模式算法](notes/distributed_autograd.html#fast-mode-algorithm),该算法假设在反向传递过程中,跨工作程序在同一分布式 autograd 上下文中发送的所有 RPC 消息将是 autograd 图的一部分。 我们使用提供的根来发现 autograd 图并计算适当的依赖关系。 该方法将阻塞,直到完成整个 autograd 计算。 我们在每个节点上的适当 [`torch.distributed.autograd.context`](#torch.distributed.autograd.context "torch.distributed.autograd.context") 中累积梯度。 当调用 [`torch.distributed.autograd.backward()`](#torch.distributed.autograd.backward "torch.distributed.autograd.backward") 时,使用的 autograd 上下文是该节点的当前 autograd 上下文。 如果没有有效的 autograd 上下文,我们将引发错误。 您可以使用 [`get_gradients()`](#torch.distributed.autograd.get_gradients "torch.distributed.autograd.get_gradients") API 检索累积的梯度。 Parameters **根**(_列表_)–代表自动梯度计算根的张量。 所有张量应为标量。 Example: ``` >> import torch.distributed.autograd as dist_autograd >> with dist_autograd.context() as context_id: >> pred = model.forward() >> loss = loss_func(pred, loss) >> dist_autograd.backward(loss) ``` * * * ``` torch.distributed.autograd.get_gradients(context_id: int) → Dict[Tensor, Tensor] ``` 从张量检索映射,以获取在提供的`context_id`中作为累积的 autograd 向后传递的一部分的张量所对应的张量。 Parameters **context_id** (_python:int_ )–我们应为其检索梯度的 autograd 上下文 ID。 Returns 一个映射,其中键是张量,值是该张量的关联渐变。 Example: ``` >> import torch.distributed.autograd as dist_autograd >> with dist_autograd.context() as context_id: >> t1 = torch.rand((3, 3), requires_grad=True) >> t2 = torch.rand((3, 3), requires_grad=True) >> loss = t1 + t2 >> dist_autograd.backward([loss.sum()]) >> grads = dist_autograd.get_gradients(context_id) >> print (grads[t1]) >> print (grads[t2]) ``` ## 分布式优化器 [`torch.distributed.optim`](#module-torch.distributed.optim "torch.distributed.optim") 公开 DistributedOptimizer,后者获取远程参数列表 ([`RRef`](#torch.distributed.rpc.RRef "torch.distributed.rpc.RRef")),并在参数所在的工作线程中本地运行优化器。 分布式优化器可以使用任何本地优化器[算法](optim.html#optimizer-algorithms)来将梯度应用于每个工作者。 * * * ``` class torch.distributed.optim.DistributedOptimizer(optimizer_class, params_rref, *args, **kwargs) ``` DistributedOptimizer 远程引用分散在工作程序中的参数,并为每个参数在本地应用给定的优化器。 此类使用 [`get_gradients()`](#torch.distributed.autograd.get_gradients "torch.distributed.autograd.get_gradients") 来检索特定参数的梯度。 来自同一客户端或不同客户端的对 [`step()`](#torch.distributed.optim.DistributedOptimizer.step "torch.distributed.optim.DistributedOptimizer.step") 的并发调用将在每个工作人员上进行序列化-因为每个工作人员的优化程序一次只能处理一组渐变。 但是,不能保证完整的前向后向优化程序序列将一次为一个客户端执行。 这意味着所应用的渐变可能不对应于在给定工人上执行的最新前向通过。 此外,也不能保证在所有工人之间订购。 Parameters * **optimizer_class** ([_optim.Optimizer_](optim.html#torch.optim.Optimizer "torch.optim.Optimizer"))–在每个 worker 上实例化的优化器的类。 * **params_rref** (_列表_ _[_ [_RRef_](#torch.distributed.rpc.RRef "torch.distributed.rpc.RRef") _]_ )–本地或本地参考的 RRef 列表 远程参数进行优化。 * **args** –传递给每个工作程序上的优化器构造函数的参数。 * **kwargs** –传递给每个工作程序上的优化器构造函数的参数。 Example: ``` >> import torch.distributed.autograd as dist_autograd >> import torch.distributed.rpc as rpc >> from torch import optim >> from torch.distributed.optim import DistributedOptimizer >> >> with dist_autograd.context() as context_id: >> # Forward pass. >> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >> loss = rref1.to_here() + rref2.to_here() >> >> # Backward pass. >> dist_autograd.backward([loss.sum()]) >> >> # Optimizer. >> dist_optim = DistributedOptimizer( >> optim.SGD, >> [rref1, rref2], >> lr=0.05, >> ) >> dist_optim.step() ``` * * * ``` step() ``` 执行一个优化步骤。 这将在每个包含要优化参数的工作程序上调用 [`torch.optim.Optimizer.step()`](optim.html#torch.optim.Optimizer.step "torch.optim.Optimizer.step") ,并将阻塞直到所有工作程序返回。 当前的分布式 autograd [`context`](#torch.distributed.autograd.context "torch.distributed.autograd.context") 将在全球范围内使用。