未验证 提交 32b09b51 编写于 作者: 武毅 提交者: GitHub

Merge pull request #5776 from typhoonzero/update_refactor_dist_train_doc

Update design of dist train refactor
......@@ -52,8 +52,9 @@ The IR for PaddlePaddle after refactoring is called a `Block`, it specifies the
The user can not directly specify the parameter update rule for the parameter server in the Python module, since the parameter server does not use the same computation definition as the trainer. Instead, the update rule is baked inside the parameter server. The user can not specify the update rule explicitly.
This could be fixed by making the parameter server run the same computation definition as the trainer (the user's Python module). For a detailed explanation, refer to this document -
[Design Doc: Operation Graph Based Parameter Server](./parameter_server.md)
This could be fixed by making the parameter server also run an IR, which can be different to the trainer side
For a detailed explanation, refer to this document -
[Design Doc: Parameter Server](./parameter_server.md)
## Distributed Training Architecture
......@@ -61,68 +62,111 @@ The revamped distributed training architecture can address the above discussed l
<img src="src/distributed_architecture.png"/>
The major components in the architecture are: *PaddlePaddle Python*, *PaddlePaddle converter* and *PaddlePaddle runtime*.
The major components are: *Python API*, *Distribute Transpiler* and *Remote Executor*.
### PaddlePaddle Python
### Python API
PaddlePaddle Python is the Python library that user's Python code invokes, to read the data. build the neural network topology, start training, etc.
Python API is the Python library that user's Python code invokes, to read the data, build the neural network topology, and start training, etc.
```Python
paddle.init()
input = paddle.op.recordIO("/home/data/mnist.recordio") # file stored on the cluster
img, label = input[0], input[1]
hidden = paddle.layer.fc(input=img, size=200, act=paddle.activation.Tanh())
prediction = paddle.layer.fc(input=img, size=10, act=paddle.activation.Softmax())
cost = paddle.layer.classification_cost(input=prediction, label=label)
optimizer = paddle.optimizer.SGD(cost, learning_rate=0.01)
session = paddle.session.NewRemote(num_trainer=3, num_ps=2, GPU_per_trainer=1)
for i in range(1000):
_, cost_val = session.eval(targets=[cost, optimizer])
print cost_val
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
...
predict = fluid.layers.fc(input=conv_pool_2, size=10, act="softmax")
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
optimizer = fluid.optimizer.Adam(learning_rate=0.01)
optimizer.minimize(avg_cost)
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=500),
batch_size=BATCH_SIZE)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
for pass_id in range(10):
for data in train_reader():
loss, acc = exe.run(trainer_prog,
feed=feeder.feed(data),
fetch_list=[avg_cost])
```
The above code is what a typical Python trainer code is, the neural network topology is built using the helper functions such as `paddle.layer.fc`. Training is done by calling `session.eval` iteratively.
#### session.eval
As shown in the graph, `session.eval` sends the IR and the evaluation inputs or targets to the PaddlePaddle cluster for evaluation.
The targets can be any variable in the computation graph. When the target is say, the `optimizer` variable, the neural network will be optimized once. When the target is the `cost` variable, `session.eval` returns the cost value. Based on what the target is, an appropriate action is taken.
The Python `session` is a wrapper of the C++ `Session` class. For more information about `Session`, refer to this document - [Design Doc: Session](./session.md).
### PaddlePaddle Converter
The PaddlePaddle converter automatically converts the IR in the request (IR and evaluation inputs/targets) from PaddlePaddle Python to partitioned IRs and dispatches the new IRs and evaluation inputs/targets to different PaddlePaddle runtimes. Below are the steps that are followed :
1. Add a `feed` OP that feeds the eval inputs, and a `fetch` OP that fetches the eval targets to the IR.
2. Extract a new computation (sub)graph with the `feed` and `fetch` OPs as the boundary. The runtime does not need to run the OP that is not dependent on the `fetch` OP.
3. Optimize the computation graph.
4. Place the OPs in the graph onto different devices on different PaddlePaddle runtime according to a placement algorithm and the device constraints specified by the user.
5. Partition the graph according to runtime boundaries and add `send` / `recv` OP pair on the runtime boundaries.
The code above is a typical local training program, the "Training Program" is built using helper functions such as
`fluid.layer.fc`. The training is done by calling `Executor.run`
iteratively.
For more details, the implementation of IR is [Program](../program.md), and `ProgramDesc` is the protobuf type.
[Executor](../executor.md) simply runs the `ProgramDesc`. For local training you generally use
`Executor` to run the program locally. For any kind of distributed training, you can use
`RemoteExecutor` to specify desired distributed training method with some optional arguments.
### Distributed Transpiler
The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then
the Remote Executor dispatches the new IRs to Remote Executors across the cluster.
Below are the steps that are followed :
1. User only need to change `Executor` to `RemoteExecutor` to change local program to distributed program.
1. `RemoteExecutor` calls `Distributed Transpiler` to "transpile" user's program to several IRs representing a
distributed training program:
1. Parse configurations from `RemoteExecutor`.
1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming.
1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. Take
DataParallelism type for example, it removes the optimization operators and add a `send` OP to the
"trainer" role, then add the optimization operators to the parameter server role within the `recv` OP.
1. Dispatch the partitioned graph to different `RemoteExecutor` in the cluster.
1. `RemoteExecutor` on each node run the received `ProgramDesc` utill the end.
### RemoteExecutor
As shown in the graph, `RemoteExecutor.run` sends the IR to the cluster for Execution.
You can also use parameter `fetch_list` to interactively fetch variable back to local for
log printing.
The Python `RemoteExecutor` is derived from `Executor` class.
```python
exe = RemoteExecutor(
feed=feeder.feed(data),
fetch_list=[avg_cost],
job_desc=JobDesc(
jobname,
num_trainer,
num_pserver,
cpu_per_trainer,
gpu_per_trainer,
mem_per_trainer,
cpu_per_pserver,
mem_per_pserver
))
for data in train_reader():
loss, acc = exe.run(trainer_prog,
feed=feeder.feed(data),
fetch_list=[avg_cost])
```
6. Dispatch the partitioned graph to different PaddlePaddle runtimes.
`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.
7. PaddlePaddle runtimes with the `fetch` OP reports evaluation results back to the converter, the converter reports the evaluation results back to the PaddlePaddle Python.
<img src="src/remote_executor.png"/>
The output IRs will be cached to optimize the conversion latency.
`RemoteExecutor.run` sends the `ProgramDesc` and
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/develop/doc/autoscale/README.md#training-job-resource)
to a server in the cluster which executes `RemoteExecutor.listen`. This server is responsible
to start the final Kubernetes Jobs to run the different role of `ProgramDesc`.
#### Placement Algorithm
### Placement Algorithm
Our first implementation will only support "trainer-parameter server" placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the "trainer-parameter server" architecture of PaddlePaddle v0.10.0, but is more generic and flexible.
In the future, a more general placement algorithm should be implemented, which makes placements according to the input IR, and a model of device computation time and device communication time. Model parallelism requires the generic placement algorithm.
### PaddlePaddle Runtime
The PaddlePaddle runtime owns multiple devices (e.g., CPUs, GPUs) and runs the IR. The runtime does not need to do OP placement since it is already done by the converter.
### Local Training Architecture
The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:
......@@ -132,9 +176,18 @@ The local training architecture will be the same as the distributed training arc
### Training Data
In PaddlePaddle v0.10.0, training data is typically read with a [data reader](../reader/README.md) from Python. This approach is no longer efficient when training in a distributed fashion since the Python process no longer runs on the same node with the trainer processes. The Python reader will need to read from the distributed filesystem (assuming it has the required access) and send to the trainers, doubling the network traffic.
When doing distributed training, the user can still use Python data reader: the training data are sent with `session.eval`. However this should be used for debugging purpose only. The users are encouraged to use the read data OPs.
In PaddlePaddle v0.10.0, training data is typically read
with [data reader](../reader/README.md) from Python. This approach is
no longer efficient when training distributedly since the Python
process no longer runs on the same node with the trainer processes,
the Python reader will need to read from the distributed filesystem
(assuming it has the access) and send to the trainers, doubling the
network traffic.
When doing distributed training, the user can still use Python data
reader: the training data are sent with `Executor.run`. However, should
be used for debugging purpose only. The users are encouraged to use
the read data OPs.
## References:
......
# Design Doc: Operation Graph Based Parameter Server
# Design Doc: Parameter Server
## Abstract
......@@ -10,7 +10,7 @@ different purposes.
## Background
The previous implementations of the parameter server does not run a
subgraph. parameter initialization, optimizer computation, network
fluid sub-program. Parameter initialization, optimizer computation, network
communication and checkpointing are implemented twice on both the
trainer and the parameter server.
......@@ -23,10 +23,10 @@ server becomes a natural extension.
## Design
### Graph Converter
### Distributed Transpiler
The *graph converter* converts the user-defined operation (OP) graph
into subgraphs to be scheduled on different nodes with the following
The *Distributed Transpiler* converts the user-defined fluid program
into sub-programs to be scheduled on different nodes with the following
steps:
1. OP placement: the OPs will be placed on different nodes according
......@@ -34,7 +34,6 @@ steps:
time. Currently we will use a simple heuristic that puts parameter
varable on parameter server workers and everything else on trainer
workers.
1. Add communication OPs to enable the communication between nodes.
We will need these OPs: *Send*, *Recv*, *Enqueue*, *Dequeue*.
......@@ -48,8 +47,8 @@ After converting:
<img src="src/dist-graph.png" width="700"/>
1. The parameter variable W and it's optimizer subgraph are placed on the parameter server.
1. Operators are added to the subgraphs.
1. The parameter variable W and it's optimizer program are placed on the parameter server.
1. Operators are added to the program.
- *Send* sends data to the connected *Recv* operator. The
scheduler on the receive node will only schedule *Recv* operator
to run when the *Send* operator has ran (the *Send* OP will mark
......@@ -64,39 +63,30 @@ After converting:
### Benefits
- Model parallelism become easier to implement: it's an extension to
the trainer - parameter server approach. we already have the
communication OPs, but need to extend the graph converter's
placement functionality.
the trainer - parameter server approach. We can have several "Transpilers"
to achieve different goals.
- User-defined optimizer is easier to add - user can now express it as
a subgraph.
a sub-program.
- No more duplication logic inside the trainer and the parameter
server mentioned in the background section.
### Challenges
- It might be hard for the graph converter to cut a general graph
(without any hint for which subgraph is the optimizer). We may need
to label which subgraph inside the OP graph is the optimizer.
- It's important to balance the parameter shards of on multiple
parameter server. If a single parameter is very big (some
word-embedding, fully connected, softmax layer), we need to
automatically partition the single parameter onto different
parameter servers when possible (only element-wise optimizer depends
on the parameter variable).
- In the "Aync SGD" figure, the "W" variable on the parameter server
could be read and wrote concurrently. See
[here](https://github.com/PaddlePaddle/Paddle/pull/6394) for more
details about concurrent program in fluid.
### Discussion
- In the "Aync SGD" figure, the "W" variable on the parameter server
could be read and wrote concurrently, what is our locking strategy?
E.g., each variable have a lock cpp method to be invoked by every
OP, or, have a lock OP.
- Can the Enqueue OP be implemented under our current tensor design
(puts the input tensor into the queue tensor)?
- *Dequeue* OP will have variable numbers of output (depends on the
`min_count` attribute), does our current design support it? (similar
question for the *Add* OP)
......
# Design Doc: Session
## Abstract
The *session* object encapsulates the environment in which the
computation graph is executed.
We will have the *local* session and *remote* session, they offer the
same [interface](#interface). The local session encapsulates the local
runtime environment and the remote session encapsulates the cluster
runtime environment.
The local runtime environment contains:
1. computation devices (i.e., CPU, GPU) handles, and
1. the [scope](../scope.md) which holds all variables.
The remote runtime environment contains:
1. computation devices (i.e., CPU and GPU on node 0, 1) in a cluster,
and
1. the distributed [scope](../scope.md) in a cluster which holds all
variables.
The user can create a remote session on Paddle Cloud and evaluate the
computation graph with it. In this way, the user can control the
remote computation resource in a cluster from his local computer.
## Background
The current design has an implicit global session in which
`paddle.eval()` is executed. The pain point is:
Since the user is not able to explicitly switch between runtime
environments, the user cannot run a topology in two independent
environments.
For example, in reinforcement learning, the user may want to have a
stale model for inference and a fresh model for training, and only
replace the stale model with the fresh model periodically.
Furthermore, we have no concept that encapsulates a remote environment
that executes a computation graph.
We need the session object to address above issues.
## Session
A session is an object that owns the runtime environment. All
computations are executed through `session.eval()`.
### Interface
```python
eval(
targets,
feed_dict=None,
)
```
Evaluates the target Operations or Variables in `targets`.
- *targets*: the evaluation targets. Can be a single Operation or
Variable, or a list with the Operations or Variables as
elements. The value returned by `eval()` has the same shape as the
`target` argument.
The PaddlePaddle program is represented by
the [ProgramDesc](../design/program.md), `eval()` will infer the
ProgramDesc from the given targets and run the PaddlePaddle
program. Please
see
[this graph](./distributed_architecture.md#local-training-architecture) for
the detailed illustration for the local session
and
[this graph](./distributed_architecture.md#distributed-training-architecture) for
the detailed illustration for the remote session.
- *feed_dict*: a dictionary that contains the tensors which override
the edges of the computation graph.
feed_dict not only can provide the input data, it can override any
OP's input as well:
```python
a = pd.constant(2.0, name="a")
b = pd.variable(name="b")
c = pd.mul(a,b)
sess.eval(targets=c, feed_dict={"b":3.0}) # returns 6.0
```
```python
close()
```
Closes the session and releases the scope that the session owns.
### Create a Local Session
```python
session(
devices=None
)
```
Creates a new session. One session owns one global scope, so creating
multiple sessions will create different scopes.
- *devices*: a single `string` or a list of `string` of device names,
the corresponding devices will be the computation devices for
`eval()`. If not specified, all available devices (e.g., all GPUs)
will be used. The user doesn't need to specify the CPU device since
it will be always used. Multiple sessions can use the same device.
#### Example
```Python
a = paddle.constant(1.0)
b = paddle.constant(2.0)
c = a + b
sess = paddle.session(devices=["gpu:0", "gpu:1", "fpga:0"])
sess.eval(c)
sess.close()
```
### Create a Remote Session
```python
create_cloud_job(
name,
num_trainer,
mem_per_trainer,
gpu_per_trainer,
cpu_per_trainer,
num_ps,
mem_per_ps,
cpu_per_ps,
)
```
Creates a Paddle Cloud job. Fails if the job name exists.
```python
get_cloud_job(
name
)
```
Gets a Paddle Cloud job.
```python
remote_session(
job
)
```
- *job*: the Paddle Cloud job.
#### Example
```Python
reader = paddle.reader.recordio("/pfs/home/peter/mnist-train-*") # data stored on Paddle Cloud
image = reader.column(0)
label = reader.column(1)
fc1 = paddle.op.fc(image, size=256, act="sigmoid")
fc2 = paddle.op.fc(fc1, size=10, act="softmax")
cost = paddle.op.cross_entropy(fc2, label)
opt = paddle.optimizer.sgd(cost)
job = paddle.create_cloud_job("test", 3, "1G", 1, 1, 2, "1G", 1)
sess = paddle.remote_ession(job)
for i in range(1000):
sess.eval(opt)
sess.close()
```
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册