@@ -52,8 +52,9 @@ The IR for PaddlePaddle after refactoring is called a `Block`, it specifies the
The user can not directly specify the parameter update rule for the parameter server in the Python module, since the parameter server does not use the same computation definition as the trainer. Instead, the update rule is baked inside the parameter server. The user can not specify the update rule explicitly.
This could be fixed by making the parameter server run the same computation definition as the trainer (the user's Python module). For a detailed explanation, refer to this document -
[Design Doc: Operation Graph Based Parameter Server](./parameter_server.md)
This could be fixed by making the parameter server also run an IR, which can be different to the trainer side
For a detailed explanation, refer to this document -
The above code is what a typical Python trainer code is, the neural network topology is built using the helper functions such as `paddle.layer.fc`. Training is done by calling `session.eval` iteratively.
#### session.eval
As shown in the graph, `session.eval` sends the IR and the evaluation inputs or targets to the PaddlePaddle cluster for evaluation.
The targets can be any variable in the computation graph. When the target is say, the `optimizer` variable, the neural network will be optimized once. When the target is the `cost` variable, `session.eval` returns the cost value. Based on what the target is, an appropriate action is taken.
The Python `session` is a wrapper of the C++ `Session` class. For more information about `Session`, refer to this document - [Design Doc: Session](./session.md).
### PaddlePaddle Converter
The PaddlePaddle converter automatically converts the IR in the request (IR and evaluation inputs/targets) from PaddlePaddle Python to partitioned IRs and dispatches the new IRs and evaluation inputs/targets to different PaddlePaddle runtimes. Below are the steps that are followed :
1. Add a `feed` OP that feeds the eval inputs, and a `fetch` OP that fetches the eval targets to the IR.
2. Extract a new computation (sub)graph with the `feed` and `fetch` OPs as the boundary. The runtime does not need to run the OP that is not dependent on the `fetch` OP.
3. Optimize the computation graph.
4. Place the OPs in the graph onto different devices on different PaddlePaddle runtime according to a placement algorithm and the device constraints specified by the user.
5. Partition the graph according to runtime boundaries and add `send` / `recv` OP pair on the runtime boundaries.
The code above is a typical local training program, the "Training Program" is built using helper functions such as
`fluid.layer.fc`. The training is done by calling `Executor.run`
iteratively.
For more details, the implementation of IR is [Program](../program.md), and `ProgramDesc` is the protobuf type.
[Executor](../executor.md) simply runs the `ProgramDesc`. For local training you generally use
`Executor` to run the program locally. For any kind of distributed training, you can use
`RemoteExecutor` to specify desired distributed training method with some optional arguments.
### Distributed Transpiler
The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then
the Remote Executor dispatches the new IRs to Remote Executors across the cluster.
Below are the steps that are followed :
1. User only need to change `Executor` to `RemoteExecutor` to change local program to distributed program.
1. `RemoteExecutor` calls `Distributed Transpiler` to "transpile" user's program to several IRs representing a
distributed training program:
1. Parse configurations from `RemoteExecutor`.
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming.
1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. Take
DataParallelism type for example, it removes the optimization operators and add a `send` OP to the
"trainer" role, then add the optimization operators to the parameter server role within the `recv` OP.
1. Dispatch the partitioned graph to different `RemoteExecutor` in the cluster.
1. `RemoteExecutor` on each node run the received `ProgramDesc` utill the end.
### RemoteExecutor
As shown in the graph, `RemoteExecutor.run` sends the IR to the cluster for Execution.
You can also use parameter `fetch_list` to interactively fetch variable back to local for
log printing.
The Python `RemoteExecutor` is derived from `Executor` class.
```python
exe = RemoteExecutor(
feed=feeder.feed(data),
fetch_list=[avg_cost],
job_desc=JobDesc(
jobname,
num_trainer,
num_pserver,
cpu_per_trainer,
gpu_per_trainer,
mem_per_trainer,
cpu_per_pserver,
mem_per_pserver
))
for data in train_reader():
loss, acc = exe.run(trainer_prog,
feed=feeder.feed(data),
fetch_list=[avg_cost])
```
6. Dispatch the partitioned graph to different PaddlePaddle runtimes.
`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.
7. PaddlePaddle runtimes with the `fetch` OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python.
<img src="src/remote_executor.png"/>
The output IRs will be cached to optimize the conversion latency.
to a server in the cluster which executes `RemoteExecutor.listen`. This server is responsible
to start the final Kubernetes Jobs to run the different role of `ProgramDesc`.
#### Placement Algorithm
### Placement Algorithm
Our first implementation will only support "trainer-parameter server" placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the "trainer-parameter server" architecture of PaddlePaddle v0.10.0, but is more generic and flexible.
In the future, a more general placement algorithm should be implemented, which makes placements according to the input IR, and a model of device computation time and device communication time. Model parallelism requires the generic placement algorithm.
### PaddlePaddle Runtime
The PaddlePaddle runtime owns multiple devices (e.g., CPUs, GPUs) and runs the IR. The runtime does not need to do OP placement since it is already done by the converter.
### Local Training Architecture
The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:
...
...
@@ -132,9 +176,18 @@ The local training architecture will be the same as the distributed training arc
### Training Data
In PaddlePaddle v0.10.0, training data is typically read with a [data reader](../reader/README.md) from Python. This approach is no longer efficient when training in a distributed fashion since the Python process no longer runs on the same node with the trainer processes. The Python reader will need to read from the distributed filesystem (assuming it has the required access) and send to the trainers, doubling the network traffic.
When doing distributed training, the user can still use Python data reader: the training data are sent with `session.eval`. However this should be used for debugging purpose only. The users are encouraged to use the read data OPs.
In PaddlePaddle v0.10.0, training data is typically read
with [data reader](../reader/README.md) from Python. This approach is
no longer efficient when training distributedly since the Python
process no longer runs on the same node with the trainer processes,
the Python reader will need to read from the distributed filesystem
(assuming it has the access) and send to the trainers, doubling the
network traffic.
When doing distributed training, the user can still use Python data
reader: the training data are sent with `Executor.run`. However, should
be used for debugging purpose only. The users are encouraged to use
@@ -251,62 +251,109 @@ computation is only specified in Python code which sits outside of PaddlePaddle,
<divclass="section"id="limitation-3">
<spanid="limitation-3"></span><h3>Limitation 3<aclass="headerlink"href="#limitation-3"title="Permalink to this headline">¶</a></h3>
<p>The user can not directly specify the parameter update rule for the parameter server in the Python module, since the parameter server does not use the same computation definition as the trainer. Instead, the update rule is baked inside the parameter server. The user can not specify the update rule explicitly.</p>
<p>This could be fixed by making the parameter server run the same computation definition as the trainer (the user’s Python module). For a detailed explanation, refer to this document -
<aclass="reference internal"href="parameter_server.html"><spanclass="doc">Design Doc: Operation Graph Based Parameter Server</span></a></p>
<p>This could be fixed by making the parameter server also run an IR, which can be different to the trainer side
For a detailed explanation, refer to this document -
<spanid="distributed-training-architecture"></span><h2>Distributed Training Architecture<aclass="headerlink"href="#distributed-training-architecture"title="Permalink to this headline">¶</a></h2>
<p>The revamped distributed training architecture can address the above discussed limitations. Below is the illustration of how it does so:</p>
<p>The major components in the architecture are: <em>PaddlePaddle Python</em>, <em>PaddlePaddle converter</em> and <em>PaddlePaddle runtime</em>.</p>
<divclass="section"id="paddlepaddle-python">
<spanid="paddlepaddle-python"></span><h3>PaddlePaddle Python<aclass="headerlink"href="#paddlepaddle-python"title="Permalink to this headline">¶</a></h3>
<p>PaddlePaddle Python is the Python library that user’s Python code invokes, to read the data. build the neural network topology, start training, etc.</p>
<spanclass="nb">input</span><spanclass="o">=</span><spanclass="n">paddle</span><spanclass="o">.</span><spanclass="n">op</span><spanclass="o">.</span><spanclass="n">recordIO</span><spanclass="p">(</span><spanclass="s2">"/home/data/mnist.recordio"</span><spanclass="p">)</span><spanclass="c1"># file stored on the cluster</span>
<p>The major components are: <em>Python API</em>, <em>Distribute Transpiler</em> and <em>Remote Executor</em>.</p>
<divclass="section"id="python-api">
<spanid="python-api"></span><h3>Python API<aclass="headerlink"href="#python-api"title="Permalink to this headline">¶</a></h3>
<p>Python API is the Python library that user’s Python code invokes, to read the data, build the neural network topology, and start training, etc.</p>
<p>The above code is what a typical Python trainer code is, the neural network topology is built using the helper functions such as <codeclass="docutils literal"><spanclass="pre">paddle.layer.fc</span></code>. Training is done by calling <codeclass="docutils literal"><spanclass="pre">session.eval</span></code> iteratively.</p>
<divclass="section"id="session-eval">
<spanid="session-eval"></span><h4>session.eval<aclass="headerlink"href="#session-eval"title="Permalink to this headline">¶</a></h4>
<p>As shown in the graph, <codeclass="docutils literal"><spanclass="pre">session.eval</span></code> sends the IR and the evaluation inputs or targets to the PaddlePaddle cluster for evaluation.
The targets can be any variable in the computation graph. When the target is say, the <codeclass="docutils literal"><spanclass="pre">optimizer</span></code> variable, the neural network will be optimized once. When the target is the <codeclass="docutils literal"><spanclass="pre">cost</span></code> variable, <codeclass="docutils literal"><spanclass="pre">session.eval</span></code> returns the cost value. Based on what the target is, an appropriate action is taken.</p>
<p>The Python <codeclass="docutils literal"><spanclass="pre">session</span></code> is a wrapper of the C++ <codeclass="docutils literal"><spanclass="pre">Session</span></code> class. For more information about <codeclass="docutils literal"><spanclass="pre">Session</span></code>, refer to this document - <aclass="reference internal"href="session.html"><spanclass="doc">Design Doc: Session</span></a>.</p>
<p>The code above is a typical local training program, the “Training Program” is built using helper functions such as
<codeclass="docutils literal"><spanclass="pre">fluid.layer.fc</span></code>. The training is done by calling <codeclass="docutils literal"><spanclass="pre">Executor.run</span></code>
iteratively.</p>
<p>For more details, the implementation of IR is <aclass="reference internal"href="../program.html"><spanclass="doc">Program</span></a>, and <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code> is the protobuf type.</p>
<p><aclass="reference internal"href="../executor.html"><spanclass="doc">Executor</span></a> simply runs the <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code>. For local training you generally use
<codeclass="docutils literal"><spanclass="pre">Executor</span></code> to run the program locally. For any kind of distributed training, you can use
<codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> to specify desired distributed training method with some optional arguments.</p>
</div>
</div>
<divclass="section"id="paddlepaddle-converter">
<spanid="paddlepaddle-converter"></span><h3>PaddlePaddle Converter<aclass="headerlink"href="#paddlepaddle-converter"title="Permalink to this headline">¶</a></h3>
<p>The PaddlePaddle converter automatically converts the IR in the request (IR and evaluation inputs/targets) from PaddlePaddle Python to partitioned IRs and dispatches the new IRs and evaluation inputs/targets to different PaddlePaddle runtimes. Below are the steps that are followed :</p>
<divclass="section"id="distributed-transpiler">
<spanid="distributed-transpiler"></span><h3>Distributed Transpiler<aclass="headerlink"href="#distributed-transpiler"title="Permalink to this headline">¶</a></h3>
<p>The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then
the Remote Executor dispatches the new IRs to Remote Executors across the cluster.
Below are the steps that are followed :</p>
<olclass="simple">
<li>Add a <codeclass="docutils literal"><spanclass="pre">feed</span></code> OP that feeds the eval inputs, and a <codeclass="docutils literal"><spanclass="pre">fetch</span></code> OP that fetches the eval targets to the IR.</li>
<li>Extract a new computation (sub)graph with the <codeclass="docutils literal"><spanclass="pre">feed</span></code> and <codeclass="docutils literal"><spanclass="pre">fetch</span></code> OPs as the boundary. The runtime does not need to run the OP that is not dependent on the <codeclass="docutils literal"><spanclass="pre">fetch</span></code> OP.</li>
<li>Optimize the computation graph.</li>
<li>Place the OPs in the graph onto different devices on different PaddlePaddle runtime according to a placement algorithm and the device constraints specified by the user.</li>
<li>Partition the graph according to runtime boundaries and add <codeclass="docutils literal"><spanclass="pre">send</span></code> / <codeclass="docutils literal"><spanclass="pre">recv</span></code> OP pair on the runtime boundaries.</li>
<li>Dispatch the partitioned graph to different PaddlePaddle runtimes.</li>
<li>PaddlePaddle runtimes with the <codeclass="docutils literal"><spanclass="pre">fetch</span></code> OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python.</li>
<li>User only need to change <codeclass="docutils literal"><spanclass="pre">Executor</span></code> to <codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> to change local program to distributed program.</li>
<li><codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> calls <codeclass="docutils literal"><spanclass="pre">Distributed</span><spanclass="pre">Transpiler</span></code> to “transpile” user’s program to several IRs representing a
distributed training program:<ol>
<li>Parse configurations from <codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code>.</li>
<li>Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming.</li>
<li>Partition the <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code> according to type and add <codeclass="docutils literal"><spanclass="pre">send</span></code> / <codeclass="docutils literal"><spanclass="pre">recv</span></code> OP pair on the boundaries. Take
DataParallelism type for example, it removes the optimization operators and add a <codeclass="docutils literal"><spanclass="pre">send</span></code> OP to the
“trainer” role, then add the optimization operators to the parameter server role within the <codeclass="docutils literal"><spanclass="pre">recv</span></code> OP.</li>
</ol>
</li>
<li>Dispatch the partitioned graph to different <codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> in the cluster.</li>
<li><codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> on each node run the received <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code> utill the end.</li>
</ol>
<p>The output IRs will be cached to optimize the conversion latency.</p>
</div>
<divclass="section"id="remoteexecutor">
<spanid="remoteexecutor"></span><h3>RemoteExecutor<aclass="headerlink"href="#remoteexecutor"title="Permalink to this headline">¶</a></h3>
<p>As shown in the graph, <codeclass="docutils literal"><spanclass="pre">RemoteExecutor.run</span></code> sends the IR to the cluster for Execution.
You can also use parameter <codeclass="docutils literal"><spanclass="pre">fetch_list</span></code> to interactively fetch variable back to local for
log printing.</p>
<p>The Python <codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> is derived from <codeclass="docutils literal"><spanclass="pre">Executor</span></code> class.</p>
<p><codeclass="docutils literal"><spanclass="pre">JobDesc</span></code> object describe the distributed job resource specification to run on
Cluster environment.</p>
<p><imgsrc="src/remote_executor.png"/></p>
<p><codeclass="docutils literal"><spanclass="pre">RemoteExecutor.run</span></code> sends the <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code> and
to a server in the cluster which executes <codeclass="docutils literal"><spanclass="pre">RemoteExecutor.listen</span></code>. This server is responsible
to start the final Kubernetes Jobs to run the different role of <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code>.</p>
</div>
<divclass="section"id="placement-algorithm">
<spanid="placement-algorithm"></span><h4>Placement Algorithm<aclass="headerlink"href="#placement-algorithm"title="Permalink to this headline">¶</a></h4>
<spanid="placement-algorithm"></span><h3>Placement Algorithm<aclass="headerlink"href="#placement-algorithm"title="Permalink to this headline">¶</a></h3>
<p>Our first implementation will only support “trainer-parameter server” placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the “trainer-parameter server” architecture of PaddlePaddle v0.10.0, but is more generic and flexible.</p>
<p>In the future, a more general placement algorithm should be implemented, which makes placements according to the input IR, and a model of device computation time and device communication time. Model parallelism requires the generic placement algorithm.</p>
</div>
</div>
<divclass="section"id="paddlepaddle-runtime">
<spanid="paddlepaddle-runtime"></span><h3>PaddlePaddle Runtime<aclass="headerlink"href="#paddlepaddle-runtime"title="Permalink to this headline">¶</a></h3>
<p>The PaddlePaddle runtime owns multiple devices (e.g., CPUs, GPUs) and runs the IR. The runtime does not need to do OP placement since it is already done by the converter.</p>
<spanid="local-training-architecture"></span><h3>Local Training Architecture<aclass="headerlink"href="#local-training-architecture"title="Permalink to this headline">¶</a></h3>
<p>The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:</p>
...
...
@@ -314,8 +361,17 @@ The targets can be any variable in the computation graph. When the target is say
</div>
<divclass="section"id="training-data">
<spanid="training-data"></span><h3>Training Data<aclass="headerlink"href="#training-data"title="Permalink to this headline">¶</a></h3>
<p>In PaddlePaddle v0.10.0, training data is typically read with a <aclass="reference internal"href="../reader/README.html"><spanclass="doc">data reader</span></a> from Python. This approach is no longer efficient when training in a distributed fashion since the Python process no longer runs on the same node with the trainer processes. The Python reader will need to read from the distributed filesystem (assuming it has the required access) and send to the trainers, doubling the network traffic.</p>
<p>When doing distributed training, the user can still use Python data reader: the training data are sent with <codeclass="docutils literal"><spanclass="pre">session.eval</span></code>. However this should be used for debugging purpose only. The users are encouraged to use the read data OPs.</p>
<p>In PaddlePaddle v0.10.0, training data is typically read
with <aclass="reference internal"href="../reader/README.html"><spanclass="doc">data reader</span></a> from Python. This approach is
no longer efficient when training distributedly since the Python
process no longer runs on the same node with the trainer processes,
the Python reader will need to read from the distributed filesystem
(assuming it has the access) and send to the trainers, doubling the
network traffic.</p>
<p>When doing distributed training, the user can still use Python data
reader: the training data are sent with <codeclass="docutils literal"><spanclass="pre">Executor.run</span></code>. However, should
be used for debugging purpose only. The users are encouraged to use
<spanid="design-doc-operation-graph-based-parameter-server"></span><h1>Design Doc: Operation Graph Based Parameter Server<aclass="headerlink"href="#design-doc-operation-graph-based-parameter-server"title="Permalink to this headline">¶</a></h1>
<spanid="design-doc-parameter-server"></span><h1>Design Doc: Parameter Server<aclass="headerlink"href="#design-doc-parameter-server"title="Permalink to this headline">¶</a></h1>
<divclass="section"id="abstract">
<spanid="abstract"></span><h2>Abstract<aclass="headerlink"href="#abstract"title="Permalink to this headline">¶</a></h2>
<p>We propose an approach to implement the parameter server. In this
...
...
@@ -220,7 +220,7 @@ different purposes.</p>
<divclass="section"id="background">
<spanid="background"></span><h2>Background<aclass="headerlink"href="#background"title="Permalink to this headline">¶</a></h2>
<p>The previous implementations of the parameter server does not run a
communication and checkpointing are implemented twice on both the
trainer and the parameter server.</p>
<p>It would be great if we can write code once and use them on both the
...
...
@@ -232,10 +232,10 @@ server becomes a natural extension.</p>
</div>
<divclass="section"id="design">
<spanid="design"></span><h2>Design<aclass="headerlink"href="#design"title="Permalink to this headline">¶</a></h2>
<divclass="section"id="graph-converter">
<spanid="graph-converter"></span><h3>Graph Converter<aclass="headerlink"href="#graph-converter"title="Permalink to this headline">¶</a></h3>
<p>The <em>graph converter</em> converts the user-defined operation (OP) graph
into subgraphs to be scheduled on different nodes with the following
<divclass="section"id="distributed-transpiler">
<spanid="distributed-transpiler"></span><h3>Distributed Transpiler<aclass="headerlink"href="#distributed-transpiler"title="Permalink to this headline">¶</a></h3>
<p>The <em>Distributed Transpiler</em> converts the user-defined fluid program
into sub-programs to be scheduled on different nodes with the following
steps:</p>
<olclass="simple">
<li>OP placement: the OPs will be placed on different nodes according
...
...
@@ -252,8 +252,8 @@ subgraphs for the trainer and the parameter server:</p>
<p>After converting:</p>
<p><imgsrc="src/dist-graph.png"width="700"/></p>
<olclass="simple">
<li>The parameter variable W and it’s optimizer subgraph are placed on the parameter server.</li>
<li>Operators are added to the subgraphs.<ul>
<li>The parameter variable W and it’s optimizer program are placed on the parameter server.</li>
<li>Operators are added to the program.<ul>
<li><em>Send</em> sends data to the connected <em>Recv</em> operator. The
scheduler on the receive node will only schedule <em>Recv</em> operator
to run when the <em>Send</em> operator has ran (the <em>Send</em> OP will mark
...
...
@@ -271,11 +271,10 @@ tensors.</li>
<spanid="benefits"></span><h3>Benefits<aclass="headerlink"href="#benefits"title="Permalink to this headline">¶</a></h3>
<ulclass="simple">
<li>Model parallelism become easier to implement: it’s an extension to
the trainer - parameter server approach. we already have the
communication OPs, but need to extend the graph converter’s
placement functionality.</li>
the trainer - parameter server approach. We can have several “Transpilers”
to achieve different goals.</li>
<li>User-defined optimizer is easier to add - user can now express it as
a subgraph.</li>
a sub-program.</li>
<li>No more duplication logic inside the trainer and the parameter
server mentioned in the background section.</li>
</ul>
...
...
@@ -283,24 +282,21 @@ server mentioned in the background section.</li>
<divclass="section"id="challenges">
<spanid="challenges"></span><h3>Challenges<aclass="headerlink"href="#challenges"title="Permalink to this headline">¶</a></h3>
<ulclass="simple">
<li>It might be hard for the graph converter to cut a general graph
(without any hint for which subgraph is the optimizer). We may need
to label which subgraph inside the OP graph is the optimizer.</li>
<li>It’s important to balance the parameter shards of on multiple
parameter server. If a single parameter is very big (some
word-embedding, fully connected, softmax layer), we need to
automatically partition the single parameter onto different
parameter servers when possible (only element-wise optimizer depends
on the parameter variable).</li>
<li>In the “Aync SGD” figure, the “W” variable on the parameter server
could be read and wrote concurrently. See
<aclass="reference external"href="https://github.com/PaddlePaddle/Paddle/pull/6394">here</a> for more
details about concurrent program in fluid.</li>
</ul>
</div>
<divclass="section"id="discussion">
<spanid="discussion"></span><h3>Discussion<aclass="headerlink"href="#discussion"title="Permalink to this headline">¶</a></h3>
<ulclass="simple">
<li>In the “Aync SGD” figure, the “W” variable on the parameter server
could be read and wrote concurrently, what is our locking strategy?
E.g., each variable have a lock cpp method to be invoked by every
OP, or, have a lock OP.</li>
<li>Can the Enqueue OP be implemented under our current tensor design
(puts the input tensor into the queue tensor)?</li>
<li><em>Dequeue</em> OP will have variable numbers of output (depends on the
the <aclass="reference external"href="design/design/program.md">ProgramDesc</a>, <codeclass="docutils literal"><spanclass="pre">eval()</span></code> will infer the
ProgramDesc from the given targets and run the PaddlePaddle
program. Please
see
<aclass="reference external"href="/design/refactor/distributed_architecture.html#local-training-architecture">this graph</a> for
the detailed illustration for the local session
and
<aclass="reference external"href="/design/refactor/distributed_architecture.html#distributed-training-architecture">this graph</a> for
the detailed illustration for the remote session.</p>
</li>
<li><pclass="first"><em>feed_dict</em>: a dictionary that contains the tensors which override
the edges of the computation graph.</p>
<p>feed_dict not only can provide the input data, it can override any
<p>Closes the session and releases the scope that the session owns.</p>
</div>
<divclass="section"id="create-a-local-session">
<spanid="create-a-local-session"></span><h3>Create a Local Session<aclass="headerlink"href="#create-a-local-session"title="Permalink to this headline">¶</a></h3>
<p>Creates a new session. One session owns one global scope, so creating
multiple sessions will create different scopes.</p>
<ulclass="simple">
<li><em>devices</em>: a single <codeclass="docutils literal"><spanclass="pre">string</span></code> or a list of <codeclass="docutils literal"><spanclass="pre">string</span></code> of device names,
the corresponding devices will be the computation devices for
<codeclass="docutils literal"><spanclass="pre">eval()</span></code>. If not specified, all available devices (e.g., all GPUs)
will be used. The user doesn’t need to specify the CPU device since
it will be always used. Multiple sessions can use the same device.</li>
</ul>
<divclass="section"id="example">
<spanid="example"></span><h4>Example<aclass="headerlink"href="#example"title="Permalink to this headline">¶</a></h4>
<spanid="create-a-remote-session"></span><h3>Create a Remote Session<aclass="headerlink"href="#create-a-remote-session"title="Permalink to this headline">¶</a></h3>
<spanid="id1"></span><h4>Example<aclass="headerlink"href="#example"title="Permalink to this headline">¶</a></h4>
<divclass="highlight-Python"><divclass="highlight"><pre><span></span><spanclass="n">reader</span><spanclass="o">=</span><spanclass="n">paddle</span><spanclass="o">.</span><spanclass="n">reader</span><spanclass="o">.</span><spanclass="n">recordio</span><spanclass="p">(</span><spanclass="s2">"/pfs/home/peter/mnist-train-*"</span><spanclass="p">)</span><spanclass="c1"># data stored on Paddle Cloud</span>
Built with <ahref="http://sphinx-doc.org/">Sphinx</a> using a <ahref="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <ahref="https://readthedocs.org">Read the Docs</a>.
@@ -52,8 +52,9 @@ The IR for PaddlePaddle after refactoring is called a `Block`, it specifies the
The user can not directly specify the parameter update rule for the parameter server in the Python module, since the parameter server does not use the same computation definition as the trainer. Instead, the update rule is baked inside the parameter server. The user can not specify the update rule explicitly.
This could be fixed by making the parameter server run the same computation definition as the trainer (the user's Python module). For a detailed explanation, refer to this document -
[Design Doc: Operation Graph Based Parameter Server](./parameter_server.md)
This could be fixed by making the parameter server also run an IR, which can be different to the trainer side
For a detailed explanation, refer to this document -
The above code is what a typical Python trainer code is, the neural network topology is built using the helper functions such as `paddle.layer.fc`. Training is done by calling `session.eval` iteratively.
#### session.eval
As shown in the graph, `session.eval` sends the IR and the evaluation inputs or targets to the PaddlePaddle cluster for evaluation.
The targets can be any variable in the computation graph. When the target is say, the `optimizer` variable, the neural network will be optimized once. When the target is the `cost` variable, `session.eval` returns the cost value. Based on what the target is, an appropriate action is taken.
The Python `session` is a wrapper of the C++ `Session` class. For more information about `Session`, refer to this document - [Design Doc: Session](./session.md).
### PaddlePaddle Converter
The PaddlePaddle converter automatically converts the IR in the request (IR and evaluation inputs/targets) from PaddlePaddle Python to partitioned IRs and dispatches the new IRs and evaluation inputs/targets to different PaddlePaddle runtimes. Below are the steps that are followed :
1. Add a `feed` OP that feeds the eval inputs, and a `fetch` OP that fetches the eval targets to the IR.
2. Extract a new computation (sub)graph with the `feed` and `fetch` OPs as the boundary. The runtime does not need to run the OP that is not dependent on the `fetch` OP.
3. Optimize the computation graph.
4. Place the OPs in the graph onto different devices on different PaddlePaddle runtime according to a placement algorithm and the device constraints specified by the user.
5. Partition the graph according to runtime boundaries and add `send` / `recv` OP pair on the runtime boundaries.
The code above is a typical local training program, the "Training Program" is built using helper functions such as
`fluid.layer.fc`. The training is done by calling `Executor.run`
iteratively.
For more details, the implementation of IR is [Program](../program.md), and `ProgramDesc` is the protobuf type.
[Executor](../executor.md) simply runs the `ProgramDesc`. For local training you generally use
`Executor` to run the program locally. For any kind of distributed training, you can use
`RemoteExecutor` to specify desired distributed training method with some optional arguments.
### Distributed Transpiler
The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then
the Remote Executor dispatches the new IRs to Remote Executors across the cluster.
Below are the steps that are followed :
1. User only need to change `Executor` to `RemoteExecutor` to change local program to distributed program.
1. `RemoteExecutor` calls `Distributed Transpiler` to "transpile" user's program to several IRs representing a
distributed training program:
1. Parse configurations from `RemoteExecutor`.
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming.
1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. Take
DataParallelism type for example, it removes the optimization operators and add a `send` OP to the
"trainer" role, then add the optimization operators to the parameter server role within the `recv` OP.
1. Dispatch the partitioned graph to different `RemoteExecutor` in the cluster.
1. `RemoteExecutor` on each node run the received `ProgramDesc` utill the end.
### RemoteExecutor
As shown in the graph, `RemoteExecutor.run` sends the IR to the cluster for Execution.
You can also use parameter `fetch_list` to interactively fetch variable back to local for
log printing.
The Python `RemoteExecutor` is derived from `Executor` class.
```python
exe = RemoteExecutor(
feed=feeder.feed(data),
fetch_list=[avg_cost],
job_desc=JobDesc(
jobname,
num_trainer,
num_pserver,
cpu_per_trainer,
gpu_per_trainer,
mem_per_trainer,
cpu_per_pserver,
mem_per_pserver
))
for data in train_reader():
loss, acc = exe.run(trainer_prog,
feed=feeder.feed(data),
fetch_list=[avg_cost])
```
6. Dispatch the partitioned graph to different PaddlePaddle runtimes.
`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.
7. PaddlePaddle runtimes with the `fetch` OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python.
<img src="src/remote_executor.png"/>
The output IRs will be cached to optimize the conversion latency.
to a server in the cluster which executes `RemoteExecutor.listen`. This server is responsible
to start the final Kubernetes Jobs to run the different role of `ProgramDesc`.
#### Placement Algorithm
### Placement Algorithm
Our first implementation will only support "trainer-parameter server" placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the "trainer-parameter server" architecture of PaddlePaddle v0.10.0, but is more generic and flexible.
In the future, a more general placement algorithm should be implemented, which makes placements according to the input IR, and a model of device computation time and device communication time. Model parallelism requires the generic placement algorithm.
### PaddlePaddle Runtime
The PaddlePaddle runtime owns multiple devices (e.g., CPUs, GPUs) and runs the IR. The runtime does not need to do OP placement since it is already done by the converter.
### Local Training Architecture
The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:
...
...
@@ -132,9 +176,18 @@ The local training architecture will be the same as the distributed training arc
### Training Data
In PaddlePaddle v0.10.0, training data is typically read with a [data reader](../reader/README.md) from Python. This approach is no longer efficient when training in a distributed fashion since the Python process no longer runs on the same node with the trainer processes. The Python reader will need to read from the distributed filesystem (assuming it has the required access) and send to the trainers, doubling the network traffic.
When doing distributed training, the user can still use Python data reader: the training data are sent with `session.eval`. However this should be used for debugging purpose only. The users are encouraged to use the read data OPs.
In PaddlePaddle v0.10.0, training data is typically read
with [data reader](../reader/README.md) from Python. This approach is
no longer efficient when training distributedly since the Python
process no longer runs on the same node with the trainer processes,
the Python reader will need to read from the distributed filesystem
(assuming it has the access) and send to the trainers, doubling the
network traffic.
When doing distributed training, the user can still use Python data
reader: the training data are sent with `Executor.run`. However, should
be used for debugging purpose only. The users are encouraged to use
<p>The user can not directly specify the parameter update rule for the parameter server in the Python module, since the parameter server does not use the same computation definition as the trainer. Instead, the update rule is baked inside the parameter server. The user can not specify the update rule explicitly.</p>
<p>This could be fixed by making the parameter server run the same computation definition as the trainer (the user’s Python module). For a detailed explanation, refer to this document -
<aclass="reference internal"href="parameter_server.html"><spanclass="doc">Design Doc: Operation Graph Based Parameter Server</span></a></p>
<p>This could be fixed by making the parameter server also run an IR, which can be different to the trainer side
For a detailed explanation, refer to this document -
<spanid="distributed-training-architecture"></span><h2>Distributed Training Architecture<aclass="headerlink"href="#distributed-training-architecture"title="永久链接至标题">¶</a></h2>
<p>The revamped distributed training architecture can address the above discussed limitations. Below is the illustration of how it does so:</p>
<p>PaddlePaddle Python is the Python library that user’s Python code invokes, to read the data. build the neural network topology, start training, etc.</p>
<spanclass="nb">input</span><spanclass="o">=</span><spanclass="n">paddle</span><spanclass="o">.</span><spanclass="n">op</span><spanclass="o">.</span><spanclass="n">recordIO</span><spanclass="p">(</span><spanclass="s2">"/home/data/mnist.recordio"</span><spanclass="p">)</span><spanclass="c1"># file stored on the cluster</span>
<p>Python API is the Python library that user’s Python code invokes, to read the data, build the neural network topology, and start training, etc.</p>
<p>The above code is what a typical Python trainer code is, the neural network topology is built using the helper functions such as <codeclass="docutils literal"><spanclass="pre">paddle.layer.fc</span></code>. Training is done by calling <codeclass="docutils literal"><spanclass="pre">session.eval</span></code> iteratively.</p>
<p>As shown in the graph, <codeclass="docutils literal"><spanclass="pre">session.eval</span></code> sends the IR and the evaluation inputs or targets to the PaddlePaddle cluster for evaluation.
The targets can be any variable in the computation graph. When the target is say, the <codeclass="docutils literal"><spanclass="pre">optimizer</span></code> variable, the neural network will be optimized once. When the target is the <codeclass="docutils literal"><spanclass="pre">cost</span></code> variable, <codeclass="docutils literal"><spanclass="pre">session.eval</span></code> returns the cost value. Based on what the target is, an appropriate action is taken.</p>
<p>The Python <codeclass="docutils literal"><spanclass="pre">session</span></code> is a wrapper of the C++ <codeclass="docutils literal"><spanclass="pre">Session</span></code> class. For more information about <codeclass="docutils literal"><spanclass="pre">Session</span></code>, refer to this document - <aclass="reference internal"href="session.html"><spanclass="doc">Design Doc: Session</span></a>.</p>
<p>The code above is a typical local training program, the “Training Program” is built using helper functions such as
<codeclass="docutils literal"><spanclass="pre">fluid.layer.fc</span></code>. The training is done by calling <codeclass="docutils literal"><spanclass="pre">Executor.run</span></code>
iteratively.</p>
<p>For more details, the implementation of IR is <aclass="reference internal"href="../program.html"><spanclass="doc">Program</span></a>, and <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code> is the protobuf type.</p>
<p><aclass="reference internal"href="../executor.html"><spanclass="doc">Executor</span></a> simply runs the <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code>. For local training you generally use
<codeclass="docutils literal"><spanclass="pre">Executor</span></code> to run the program locally. For any kind of distributed training, you can use
<codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> to specify desired distributed training method with some optional arguments.</p>
<p>The PaddlePaddle converter automatically converts the IR in the request (IR and evaluation inputs/targets) from PaddlePaddle Python to partitioned IRs and dispatches the new IRs and evaluation inputs/targets to different PaddlePaddle runtimes. Below are the steps that are followed :</p>
<p>The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then
the Remote Executor dispatches the new IRs to Remote Executors across the cluster.
Below are the steps that are followed :</p>
<olclass="simple">
<li>Add a <codeclass="docutils literal"><spanclass="pre">feed</span></code> OP that feeds the eval inputs, and a <codeclass="docutils literal"><spanclass="pre">fetch</span></code> OP that fetches the eval targets to the IR.</li>
<li>Extract a new computation (sub)graph with the <codeclass="docutils literal"><spanclass="pre">feed</span></code> and <codeclass="docutils literal"><spanclass="pre">fetch</span></code> OPs as the boundary. The runtime does not need to run the OP that is not dependent on the <codeclass="docutils literal"><spanclass="pre">fetch</span></code> OP.</li>
<li>Optimize the computation graph.</li>
<li>Place the OPs in the graph onto different devices on different PaddlePaddle runtime according to a placement algorithm and the device constraints specified by the user.</li>
<li>Partition the graph according to runtime boundaries and add <codeclass="docutils literal"><spanclass="pre">send</span></code> / <codeclass="docutils literal"><spanclass="pre">recv</span></code> OP pair on the runtime boundaries.</li>
<li>Dispatch the partitioned graph to different PaddlePaddle runtimes.</li>
<li>PaddlePaddle runtimes with the <codeclass="docutils literal"><spanclass="pre">fetch</span></code> OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python.</li>
<li>User only need to change <codeclass="docutils literal"><spanclass="pre">Executor</span></code> to <codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> to change local program to distributed program.</li>
<li><codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> calls <codeclass="docutils literal"><spanclass="pre">Distributed</span><spanclass="pre">Transpiler</span></code> to “transpile” user’s program to several IRs representing a
distributed training program:<ol>
<li>Parse configurations from <codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code>.</li>
<li>Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming.</li>
<li>Partition the <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code> according to type and add <codeclass="docutils literal"><spanclass="pre">send</span></code> / <codeclass="docutils literal"><spanclass="pre">recv</span></code> OP pair on the boundaries. Take
DataParallelism type for example, it removes the optimization operators and add a <codeclass="docutils literal"><spanclass="pre">send</span></code> OP to the
“trainer” role, then add the optimization operators to the parameter server role within the <codeclass="docutils literal"><spanclass="pre">recv</span></code> OP.</li>
</ol>
</li>
<li>Dispatch the partitioned graph to different <codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> in the cluster.</li>
<li><codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> on each node run the received <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code> utill the end.</li>
</ol>
<p>The output IRs will be cached to optimize the conversion latency.</p>
<p>As shown in the graph, <codeclass="docutils literal"><spanclass="pre">RemoteExecutor.run</span></code> sends the IR to the cluster for Execution.
You can also use parameter <codeclass="docutils literal"><spanclass="pre">fetch_list</span></code> to interactively fetch variable back to local for
log printing.</p>
<p>The Python <codeclass="docutils literal"><spanclass="pre">RemoteExecutor</span></code> is derived from <codeclass="docutils literal"><spanclass="pre">Executor</span></code> class.</p>
<p><codeclass="docutils literal"><spanclass="pre">JobDesc</span></code> object describe the distributed job resource specification to run on
Cluster environment.</p>
<p><imgsrc="src/remote_executor.png"/></p>
<p><codeclass="docutils literal"><spanclass="pre">RemoteExecutor.run</span></code> sends the <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code> and
to a server in the cluster which executes <codeclass="docutils literal"><spanclass="pre">RemoteExecutor.listen</span></code>. This server is responsible
to start the final Kubernetes Jobs to run the different role of <codeclass="docutils literal"><spanclass="pre">ProgramDesc</span></code>.</p>
<p>Our first implementation will only support “trainer-parameter server” placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the “trainer-parameter server” architecture of PaddlePaddle v0.10.0, but is more generic and flexible.</p>
<p>In the future, a more general placement algorithm should be implemented, which makes placements according to the input IR, and a model of device computation time and device communication time. Model parallelism requires the generic placement algorithm.</p>
<p>The PaddlePaddle runtime owns multiple devices (e.g., CPUs, GPUs) and runs the IR. The runtime does not need to do OP placement since it is already done by the converter.</p>
<spanid="local-training-architecture"></span><h3>Local Training Architecture<aclass="headerlink"href="#local-training-architecture"title="永久链接至标题">¶</a></h3>
<p>The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:</p>
...
...
@@ -327,8 +374,17 @@ The targets can be any variable in the computation graph. When the target is say
<p>In PaddlePaddle v0.10.0, training data is typically read with a <aclass="reference internal"href="../reader/README.html"><spanclass="doc">data reader</span></a> from Python. This approach is no longer efficient when training in a distributed fashion since the Python process no longer runs on the same node with the trainer processes. The Python reader will need to read from the distributed filesystem (assuming it has the required access) and send to the trainers, doubling the network traffic.</p>
<p>When doing distributed training, the user can still use Python data reader: the training data are sent with <codeclass="docutils literal"><spanclass="pre">session.eval</span></code>. However this should be used for debugging purpose only. The users are encouraged to use the read data OPs.</p>
<p>In PaddlePaddle v0.10.0, training data is typically read
with <aclass="reference internal"href="../reader/README.html"><spanclass="doc">data reader</span></a> from Python. This approach is
no longer efficient when training distributedly since the Python
process no longer runs on the same node with the trainer processes,
the Python reader will need to read from the distributed filesystem
(assuming it has the access) and send to the trainers, doubling the
network traffic.</p>
<p>When doing distributed training, the user can still use Python data
reader: the training data are sent with <codeclass="docutils literal"><spanclass="pre">Executor.run</span></code>. However, should
be used for debugging purpose only. The users are encouraged to use
the <aclass="reference external"href="design/design/program.md">ProgramDesc</a>, <codeclass="docutils literal"><spanclass="pre">eval()</span></code> will infer the
ProgramDesc from the given targets and run the PaddlePaddle
program. Please
see
<aclass="reference external"href="/design/refactor/distributed_architecture.html#local-training-architecture">this graph</a> for
the detailed illustration for the local session
and
<aclass="reference external"href="/design/refactor/distributed_architecture.html#distributed-training-architecture">this graph</a> for
the detailed illustration for the remote session.</p>
</li>
<li><pclass="first"><em>feed_dict</em>: a dictionary that contains the tensors which override
the edges of the computation graph.</p>
<p>feed_dict not only can provide the input data, it can override any
<p>Creates a new session. One session owns one global scope, so creating
multiple sessions will create different scopes.</p>
<ulclass="simple">
<li><em>devices</em>: a single <codeclass="docutils literal"><spanclass="pre">string</span></code> or a list of <codeclass="docutils literal"><spanclass="pre">string</span></code> of device names,
the corresponding devices will be the computation devices for
<codeclass="docutils literal"><spanclass="pre">eval()</span></code>. If not specified, all available devices (e.g., all GPUs)
will be used. The user doesn’t need to specify the CPU device since
it will be always used. Multiple sessions can use the same device.</li>
<divclass="highlight-Python"><divclass="highlight"><pre><span></span><spanclass="n">reader</span><spanclass="o">=</span><spanclass="n">paddle</span><spanclass="o">.</span><spanclass="n">reader</span><spanclass="o">.</span><spanclass="n">recordio</span><spanclass="p">(</span><spanclass="s2">"/pfs/home/peter/mnist-train-*"</span><spanclass="p">)</span><spanclass="c1"># data stored on Paddle Cloud</span>
Built with <ahref="http://sphinx-doc.org/">Sphinx</a> using a <ahref="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <ahref="https://readthedocs.org">Read the Docs</a>.