64.md 24.7 KB
Newer Older
W
wizardforcel 已提交
1
# 分布式 RPC 框架入门
W
wizardforcel 已提交
2

W
wizardforcel 已提交
3
> 原文:<https://pytorch.org/tutorials/intermediate/rpc_tutorial.html>
W
wizardforcel 已提交
4

W
wizardforcel 已提交
5
**作者**[Shen Li](https://mrshenli.github.io/)
W
wizardforcel 已提交
6 7 8 9 10 11

先决条件:

*   [PyTorch 分布式概述](../beginner/dist_overview.html)
*   [RPC API 文档](https://pytorch.org/docs/master/rpc.html)

W
wizardforcel 已提交
12
本教程使用两个简单的示例来演示如何使用[`torch.distributed.rpc`](https://pytorch.org/docs/master/rpc.html)包构建分布式训练,该包首先在 PyTorch v1.4 中作为原型功能引入。 这两个示例的源代码可以在 [PyTorch 示例](https://github.com/pytorch/examples)中找到。
W
wizardforcel 已提交
13

W
wizardforcel 已提交
14
先前的教程[分布式数据并行入门](ddp_tutorial.html)[使用 PyTorch](dist_tuto.html) 编写分布式应用,描述了[`DistributedDataParallel`](https://pytorch.org/docs/stable/_modules/torch/nn/parallel/distributed.html),该模型支持特定的训练范例,该模型可在多个进程之间复制,每个进程都处理输入数据的拆分。 有时,您可能会遇到需要不同训练范例的场景。 例如:
W
wizardforcel 已提交
15

W
wizardforcel 已提交
16
1.  在强化学习中,从环境中获取训练数据可能相对昂贵,而模型本身可能很小。 在这种情况下,产生多个并行运行的观察者并共享一个智能体可能会很有用。 在这种情况下,智能体将在本地负责训练,但是应用仍将需要库在观察者和训练者之间发送和接收数据。
W
wizardforcel 已提交
17 18
2.  您的模型可能太大,无法容纳在一台计算机上的 GPU 中,因此需要一个库来帮助将模型拆分到多台计算机上。 或者,您可能正在实现[参数服务器](https://www.cs.cmu.edu/~muli/file/parameter_server_osdi14.pdf)训练框架,其中模型参数和训练器位于不同的机器上。

W
wizardforcel 已提交
19
[`torch.distributed.rpc`](https://pytorch.org/docs/master/rpc.html)包可以帮助解决上述情况。 在情况 1 中, [RPC](https://pytorch.org/docs/master/rpc.html#rpc)[RRef](https://pytorch.org/docs/master/rpc.html#rref) 允许将数据从一个工作程序发送到另一个工作程序,同时轻松引用远程数据对象。 在情况 2 中,[分布式 Autograd](https://pytorch.org/docs/master/rpc.html#distributed-autograd-framework)[分布式优化器](https://pytorch.org/docs/master/rpc.html#module-torch.distributed.optim)使执行反向传递和优化器步骤就像本地训练一样。 在接下来的两节中,我们将使用强化学习示例和语言模型示例来演示[`torch.distributed.rpc`](https://pytorch.org/docs/master/rpc.html)的 API。 请注意,本教程并非旨在构建最准确或最有效的模型来解决给定的问题,相反,此处的主要目标是演示如何使用[`torch.distributed.rpc`](https://pytorch.org/docs/master/rpc.html)包来构建分布式训练应用。
W
wizardforcel 已提交
20

W
wizardforcel 已提交
21
## 使用 RPC 和 RRef 的分布式强化学习
W
wizardforcel 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48

本节介绍了使用 RPC 建立玩具分布式强化学习模型以解决 [OpenAI Gym](https://gym.openai.com) 中的 CartPole-v1 的步骤。 策略代码主要是从现有的单线程[示例](https://github.com/pytorch/examples/blob/master/reinforcement_learning)中借用的,如下所示。 我们将跳过`Policy`设计的详细信息,并将重点介绍 RPC 的用法。

```py
import torch.nn as nn
import torch.nn.functional as F

class Policy(nn.Module):

    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)

        self.saved_log_probs = []
        self.rewards = []

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)

```

W
wizardforcel 已提交
49
首先,让我们准备一个助手,以在`RRef`的所有者工作程序上远程运行函数。 您将在本教程的示例中的多个地方发现该函数。 理想情况下,`torch.distributed.rpc`包应立即提供这些助手函数。 例如,如果应用可以直接调用`RRef.some_func(*arg)`,然后将其转换为`RRef`所有者的 RPC,将会更容易。 在[`pytorch/pytorch#31743`](https://github.com/pytorch/pytorch/issues/31743)中跟踪了此 API 的进度。
W
wizardforcel 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65

```py
from torch.distributed.rpc import rpc_sync

def _call_method(method, rref, *args, **kwargs):
    return method(rref.local_value(), *args, **kwargs)

def _remote_method(method, rref, *args, **kwargs):
    args = [method, rref] + list(args)
    return rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs)

# to call a function on an rref, we could do the following
# _remote_method(some_func, rref, *args)

```

W
wizardforcel 已提交
66
我们准备介绍观察员。 在此示例中,每个观察者创建自己的环境,并等待智能体的命令来运行情节。 在每个情节中,一个观察者最多循环`n_steps`个迭代,并且在每个迭代中,它使用 RPC 将其环境状态传递给智能体并取回操作。 然后,它将该操作应用于其环境,并从环境中获取奖励和下一个状态。 之后,观察者使用另一个 RPC 向智能体报告奖励。 同样,请注意,这显然不是最有效的观察者实现。 例如,一个简单的优化可能是将当前状态和最后的报酬打包到一个 RPC 中,以减少通信开销。 但是,目标是演示 RPC API,而不是为 CartPole 构建最佳的求解器。 因此,在此示例中,让逻辑保持简单,并明确两个步骤。
W
wizardforcel 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107

```py
import argparse
import gym
import torch.distributed.rpc as rpc

parser = argparse.ArgumentParser(
    description="RPC Reinforcement Learning Example",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument('--world_size', default=2, help='Number of workers')
parser.add_argument('--log_interval', default=1, help='Log every log_interval episodes')
parser.add_argument('--gamma', default=0.1, help='how much to value future rewards')
parser.add_argument('--seed', default=1, help='random seed for reproducibility')
args = parser.parse_args()

class Observer:

    def __init__(self):
        self.id = rpc.get_worker_info().id
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)

    def run_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), 0
        for step in range(n_steps):
            # send the state to the agent to get an action
            action = _remote_method(Agent.select_action, agent_rref, self.id, state)

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)

            # report the reward to the agent for training purpose
            _remote_method(Agent.report_reward, agent_rref, self.id, reward)

            if done:
                break

```

W
wizardforcel 已提交
108
agent 的代码稍微复杂一点,我们将其分成多个部分。 在此示例中,智能体既充当训练者又充当主角色,以便它向多个分布式观察者发送命令以运行情节,并且还记录本地的所有动作和奖励,这些动作和奖赏将在每个情节之后的训练阶段使用。 下面的代码显示了`Agent`构造器,其中大多数行都在初始化各种组件。 最后的循环在其他工作器上远程初始化观察者,并在本地将`RRefs`保留给这些观察者。 智能体稍后将使用那些观察者`RRefs`发送命令。 应用无需担心`RRefs`的寿命。 每个`RRef`的所有者维护一个引用计数图以跟踪其生命周期,并保证只要该`RRef`的任何活动用户都不会删除远程数据对象。 有关详细信息,请参考`RRef` [设计文档](https://pytorch.org/docs/master/notes/rref.html)
W
wizardforcel 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138

```py
import gym
import numpy as np

import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical

class Agent:
    def __init__(self, world_size):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.saved_log_probs = {}
        self.policy = Policy()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()
        self.running_reward = 0
        self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(remote(ob_info, Observer))
            self.rewards[ob_info.id] = []
            self.saved_log_probs[ob_info.id] = []

```

W
wizardforcel 已提交
139
接下来,智能体向观察者公开两个 API,以供他们选择动作和报告奖励。 这些函数仅在智能体上本地运行,但是将由观察者通过 RPC 触发。
W
wizardforcel 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156

```py
class Agent:
    ...
    def select_action(self, ob_id, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

    def report_reward(self, ob_id, reward):
        self.rewards[ob_id].append(reward)

```

W
wizardforcel 已提交
157
让我们在智能体上添加`run_episode`函数,该函数告诉所有观察者执行片段。 在此函数中,它首先创建一个列表,以从异步 RPC 收集期货,然后在所有观察者`RRefs`上循环以生成异步 RPC。 在这些 RPC 中,智能体还将自身的`RRef`传递给观察者,以便观察者也可以在智能体上调用函数。 如上所示,每个观察者都将 RPC 返回给智能体,它们是嵌套的 RPC。 在每个情节之后,`saved_log_probs``rewards`将包含记录的动作概率和奖励。
W
wizardforcel 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179

```py
class Agent:
    ...
    def run_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    _call_method,
                    args=(Observer.run_episode, ob_rref, self.agent_rref, n_steps)
                )
            )

        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait()

```

W
wizardforcel 已提交
180
最后,在一集之后,智能体需要训练模型,该模型在下面的`finish_episode`函数中实现。 此函数中没有 RPC,并且大多数是从单线程[示例](https://github.com/pytorch/examples/blob/master/reinforcement_learning)中借用的。 因此,我们跳过描述其内容。
W
wizardforcel 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

```py
class Agent:
    ...
    def finish_episode(self):
      # joins probs and rewards from different observers into lists
      R, probs, rewards = 0, [], []
      for ob_id in self.rewards:
          probs.extend(self.saved_log_probs[ob_id])
          rewards.extend(self.rewards[ob_id])

      # use the minimum observer reward to calculate the running reward
      min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
      self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward

      # clear saved probs and rewards
      for ob_id in self.rewards:
          self.rewards[ob_id] = []
          self.saved_log_probs[ob_id] = []

      policy_loss, returns = [], []
      for r in rewards[::-1]:
          R = r + args.gamma * R
          returns.insert(0, R)
      returns = torch.tensor(returns)
      returns = (returns - returns.mean()) / (returns.std() + self.eps)
      for log_prob, R in zip(probs, returns):
          policy_loss.append(-log_prob * R)
      self.optimizer.zero_grad()
      policy_loss = torch.cat(policy_loss).sum()
      policy_loss.backward()
      self.optimizer.step()
      return min_reward

```

W
wizardforcel 已提交
217
使用`Policy``Observer``Agent`类,我们准备启动多个过程来执行分布式训练。 在此示例中,所有进程都运行相同的`run_worker`函数,并且它们使用等级来区分其角色。 等级 0 始终是智能体,其他所有等级都是观察者。 智能体通过重复调用`run_episode``finish_episode`作为主设备,直到运行的奖励超过环境指定的奖励阈值为止。 所有观察者都被动地等待来自智能体的命令。 该代码由[`rpc.init_rpc`](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.init_rpc)[`rpc.shutdown`](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.shutdown)包装,它们分别初始化和终止 RPC 实例。 [API 页面](https://pytorch.org/docs/master/rpc.html)中提供了更多详细信息。
W
wizardforcel 已提交
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265

```py
import os
from itertools import count

import torch.multiprocessing as mp

AGENT_NAME = "agent"
OBSERVER_NAME="obs"
TOTAL_EPISODE_STEP = 100

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size)
        for i_episode in count(1):
            n_steps = int(TOTAL_EPISODE_STEP / (args.world_size - 1))
            agent.run_episode(n_steps=n_steps)
            last_reward = agent.finish_episode()

            if i_episode % args.log_interval == 0:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                      i_episode, last_reward, agent.running_reward))

            if agent.running_reward > agent.reward_threshold:
                print("Solved! Running reward is now {}!".format(agent.running_reward))
                break
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from the agent

    # block until all rpcs finish, and shutdown the RPC instance
    rpc.shutdown()

mp.spawn(
    run_worker,
    args=(args.world_size, ),
    nprocs=args.world_size,
    join=True
)

```

W
wizardforcel 已提交
266
以下是使用`world_size = 2`进行训练时的一些示例输出。
W
wizardforcel 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301

```py
Episode 10      Last reward: 26.00      Average reward: 10.01
Episode 20      Last reward: 16.00      Average reward: 11.27
Episode 30      Last reward: 49.00      Average reward: 18.62
Episode 40      Last reward: 45.00      Average reward: 26.09
Episode 50      Last reward: 44.00      Average reward: 30.03
Episode 60      Last reward: 111.00     Average reward: 42.23
Episode 70      Last reward: 131.00     Average reward: 70.11
Episode 80      Last reward: 87.00      Average reward: 76.51
Episode 90      Last reward: 86.00      Average reward: 95.93
Episode 100     Last reward: 13.00      Average reward: 123.93
Episode 110     Last reward: 33.00      Average reward: 91.39
Episode 120     Last reward: 73.00      Average reward: 76.38
Episode 130     Last reward: 137.00     Average reward: 88.08
Episode 140     Last reward: 89.00      Average reward: 104.96
Episode 150     Last reward: 97.00      Average reward: 98.74
Episode 160     Last reward: 150.00     Average reward: 100.87
Episode 170     Last reward: 126.00     Average reward: 104.38
Episode 180     Last reward: 500.00     Average reward: 213.74
Episode 190     Last reward: 322.00     Average reward: 300.22
Episode 200     Last reward: 165.00     Average reward: 272.71
Episode 210     Last reward: 168.00     Average reward: 233.11
Episode 220     Last reward: 184.00     Average reward: 195.02
Episode 230     Last reward: 284.00     Average reward: 208.32
Episode 240     Last reward: 395.00     Average reward: 247.37
Episode 250     Last reward: 500.00     Average reward: 335.42
Episode 260     Last reward: 500.00     Average reward: 386.30
Episode 270     Last reward: 500.00     Average reward: 405.29
Episode 280     Last reward: 500.00     Average reward: 443.29
Episode 290     Last reward: 500.00     Average reward: 464.65
Solved! Running reward is now 475.3163778435275!

```

W
wizardforcel 已提交
302
在此示例中,我们展示了如何使用 RPC 作为通信工具来跨工作器传递数据,以及如何使用 RRef 引用远程对象。 的确,您可以直接在`ProcessGroup` `send``recv` API 之上构建整个结构,也可以使用其他通信/ RPC 库。 但是,通过使用`torch.distributed.rpc`,您可以在后台获得本机支持并不断优化性能。
W
wizardforcel 已提交
303

W
wizardforcel 已提交
304
接下来,我们将展示如何将 RPC 和 RRef 与分布式 Autograd 和分布式优化器结合起来执行分布式模型并行训练。
W
wizardforcel 已提交
305

W
wizardforcel 已提交
306
## 使用分布式 Autograd 和分布式优化器的分布式 RNN
W
wizardforcel 已提交
307

W
wizardforcel 已提交
308
在本节中,我们将使用 RNN 模型来展示如何使用 RPC API 构建分布式模型并行训练。 示例 RNN 模型非常小,可以轻松地放入单个 GPU 中,但是我们仍将其层划分为两个不同的工作器来演示这一想法。 开发人员可以应用类似的技术在多个设备和机器上分布更大的模型。
W
wizardforcel 已提交
309

W
wizardforcel 已提交
310
RNN 模型设计是从 PyTorch [示例](https://github.com/pytorch/examples/tree/master/word_language_model)存储库中的词语言模型中借用的,该存储库包含三个主要组件,一个嵌入表,一个`LSTM`层和一个解码器。 下面的代码将嵌入表和解码器包装到子模块中,以便它们的构造器可以传递给 RPC API。 在`EmbeddingTable`子模块中,我们有意将`Embedding`层放在 GPU 上以涵盖用例。 在 v1.4 中,RPC 始终在目标工作线程上创建 CPU 张量参数或返回值。 如果函数使用 GPU 张量,则需要将其显式移动到适当的设备。
W
wizardforcel 已提交
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338

```py
class EmbeddingTable(nn.Module):
    r"""
    Encoding layers of the RNNModel
    """
    def __init__(self, ntoken, ninp, dropout):
        super(EmbeddingTable, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp).cuda()
        self.encoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, input):
        return self.drop(self.encoder(input.cuda()).cpu()

class Decoder(nn.Module):
    def __init__(self, ntoken, nhid, dropout):
        super(Decoder, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.decoder = nn.Linear(nhid, ntoken)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, output):
        return self.decoder(self.drop(output))

```

W
wizardforcel 已提交
339
使用上述子模块,我们现在可以使用 RPC 将它们组合在一起以创建 RNN 模型。 在下面的代码中,`ps`代表参数服务器,该服务器托管嵌入表和解码器的参数。 构造器使用[远程](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.remote) API 在参数服务器上创建`EmbeddingTable`对象和`Decoder`对象,并在本地创建`LSTM`子模块。 在前进过程中,训练器使用`EmbeddingTable` `RRef`查找远程子模块,然后使用 RPC 将输入数据传递到`EmbeddingTable`,并获取查找结果。 然后,它通过本地`LSTM`层运行嵌入,最后使用另一个 RPC 将输出发送到`Decoder`子模块。 通常,要实现分布式模型并行训练,开发人员可以将模型分为多个子模块,调用 RPC 远程创建子模块实例,并在必要时使用`RRef`查找它们。 正如您在下面的代码中看到的那样,它看起来与单机模型并行训练非常相似。 主要区别是用 RPC 函数替换了`Tensor.to(device)`
W
wizardforcel 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362

```py
class RNNModel(nn.Module):
    def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
        super(RNNModel, self).__init__()

        # setup embedding table remotely
        self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
        # setup LSTM locally
        self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
        # setup decoder remotely
        self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))

    def forward(self, input, hidden):
        # pass input to the remote embedding table and fetch emb tensor back
        emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
        output, hidden = self.rnn(emb, hidden)
        # pass output to the rremote decoder and get the decoded output back
        decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
        return decoded, hidden

```

W
wizardforcel 已提交
363
在介绍分布式优化器之前,让我们添加一个辅助函数来生成模型参数的 RRef 列表,该列表将由分布式优化器使用。 在本地训练中,应用可以调用`Module.parameters()`来获取对所有参数张量的引用,并将其传递给本地优化器以进行后续更新。 但是,由于某些参数存在于远程计算机上,因此同一 API 在分布式训练方案中不起作用。 因此,分布式优化器不采用参数`Tensors`的列表,而是采用`RRefs`的列表,每个模型参数一个`RRef`用于本地和远程模型参数。 辅助函数非常简单,只需调用`Module.parameters()`并在每个参数上创建一个本地`RRef`
W
wizardforcel 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390

```py
def _parameter_rrefs(module):
    param_rrefs = []
    for param in module.parameters():
        param_rrefs.append(RRef(param))
    return param_rrefs

```

然后,由于`RNNModel`包含三个子模块,因此我们需要调用`_parameter_rrefs` 3 次,并将其包装到另一个辅助函数中。

```py
class RNNModel(nn.Module):
    ...
    def parameter_rrefs(self):
        remote_params = []
        # get RRefs of embedding table
        remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
        # create RRefs for local parameters
        remote_params.extend(_parameter_rrefs(self.rnn))
        # get RRefs of decoder
        remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
        return remote_params

```

W
wizardforcel 已提交
391
现在,我们准备实现训练循环。 初始化模型参数后,我们创建`RNNModel``DistributedOptimizer`。 分布式优化器将采用参数`RRefs`的列表,查找所有不同的所有者工作器,并在每个所有者工作器上创建给定的本地优化器(即,在这种情况下,您也可以使用其他本地优化器`SGD`) 使用给定的参数(即`lr=0.05`)。
W
wizardforcel 已提交
392

W
wizardforcel 已提交
393
在训练循环中,它首先创建一个分布式 Autograd 上下文,这将帮助分布式 Autograd 引擎查找梯度和涉及的 RPC 发送/接收函数。 分布式 Autograd 引擎的设计详细信息可以在其[设计说明](https://pytorch.org/docs/master/notes/distributed_autograd.html)中找到。 然后,它像本地模型一样开始正向传播,并运行分布式后向传递。 对于后向分布,您只需要指定一个根列表,在这种情况下,就是损失`Tensor`。 分布式 Autograd 引擎将自动遍历分布式图形并正确编写梯度。 接下来,它在分布式优化器上运行`step`函数,该函数将与所有涉及的本地优化器联系以更新模型参数。 与本地训练相比,一个较小的差异是您不需要运行`zero_grad()`,因为每个 Autograd 上下文都有专用的空间来存储梯度,并且在每次迭代创建上下文时,来自不同迭代的那些梯度不会累积到同一组`Tensors`
W
wizardforcel 已提交
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445

```py
def run_trainer():
    batch = 5
    ntoken = 10
    ninp = 2

    nhid = 3
    nindices = 3
    nlayers = 4
    hidden = (
        torch.randn(nlayers, nindices, nhid),
        torch.randn(nlayers, nindices, nhid)
    )

    model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)

    # setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

    def get_next_batch():
        for _ in range(5):
            data = torch.LongTensor(batch, nindices) % ntoken
            target = torch.LongTensor(batch, ntoken) % nindices
            yield data, target

    # train for 10 iterations
    for epoch in range(10):
        for data, target in get_next_batch():
            # create distributed autograd context
            with dist_autograd.context() as context_id:
                hidden[0].detach_()
                hidden[1].detach_()
                output, hidden = model(data, hidden)
                loss = criterion(output, target)
                # run distributed backward pass
                dist_autograd.backward(context_id, [loss])
                # run distributed optimizer
                opt.step(context_id)
                # not necessary to zero grads since they are
                # accumulated into the distributed autograd context
                # which is reset every iteration.
        print("Training epoch {}".format(epoch))

```

W
wizardforcel 已提交
446
最后,让我们添加一些粘合代码以启动参数服务器和训练器流程。
W
wizardforcel 已提交
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467

```py
def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        rpc.init_rpc("trainer", rank=rank, world_size=world_size)
        _run_trainer()
    else:
        rpc.init_rpc("ps", rank=rank, world_size=world_size)
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()

if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)

```