distributed_architecture.md 10.2 KB
Newer Older
1
# Design Doc: Fluid Distributed Training Architecture
2 3 4

## Abstract

5
PaddlePaddle version 0.10.0 uses the "trainer-parameter server" architecture. We run multiple instances of trainers (where each trainer runs the same model) and parameter servers for distributed training. This architecture serves well, but has few limitations:
6

7
1. There is a need to write special code that handles tasks which should only be run on a single trainer. E.g., initializing the model, saving the model etc.
8

9
2. Model parallelism is hard: It would need all the if-else branches conditioned on the trainer ID to partition the model onto the trainers, and eventually manually writing out the inter-model-shard communication code to communicate between different trainers.
10

11
3. The user can not directly specify the parameter update rule: This would need to modify the parameter server code and compile a new binary. This makes things more complicated for researchers: A lot of extra effort is required to make this work. Besides, the training job submission program may not allow running arbitrary binaries.
12

13
This design doc discusses PaddlePaddle's new distributed training architecture that addresses the above mentioned limitations.
14 15 16

## Analysis

17
The assumption is that the user writes the trainer program in either Python or C++.
18 19 20

### Limitation 1

21
There are two basic functionalities in the trainer program:
22

23 24
1. The training logic such as loading / saving the model and printing out the logs.
2. The neural network definition such as the definition of the data layer, the fully connected layer, the cost function and the
25 26
  optimizer.

27 28
When we train using PaddlePaddle v0.10.0 in a distributed fashion, multiple instances of the same Python code are run on different nodes, hence both: the
training logic as well as the neural network computation logic, is replicated.
29

30 31
The tasks that only need to be run once belong to the training logic. Hence if we only replicate the neural network computation part, and do **not**
replicate the training logic, the limitation mentioned above can be avoided.
32 33 34

### Limitation 2

35 36
Model parallelism means that a single model is partitioned into different components and each node runs one of the component separately. This comes at the extra cost of managing the
inter-model-shard communication between nodes.
37

38 39
PaddlePaddle should ideally be able to modify the neural network computation and figure out the support for model parallelism automatically. However, the
computation is only specified in Python code which sits outside of PaddlePaddle, hence PaddlePaddle can not support the feature in this setup.
40

41
Similar to how a compiler uses an intermediate representation (IR) so that the programmer does not need to manually optimize their code for most of the cases, we can have an intermediate representation in PaddlePaddle as well. The compiler optimizes the IR as follows:
42

_青葱's avatar
_青葱 已提交
43
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/compiler.png"/>
44

45
PaddlePaddle can support model parallelism by converting the IR so that the user no longer needs to manually perform the computation and operations in the Python component:
46

_青葱's avatar
_青葱 已提交
47
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/paddle-compile.png"/>
48

49
The IR for PaddlePaddle after refactoring is called a `Block`, it specifies the computation dependency graph and the variables used in the computation.
50 51 52

### Limitation 3

53
The user can not directly specify the parameter update rule for the parameter server in the Python module, since the parameter server does not use the same computation definition as the trainer. Instead, the update rule is baked inside the parameter server. The user can not specify the update rule explicitly.
54

T
typhoonzero 已提交
55 56 57
This could be fixed by making the parameter server also run an IR, which can be different to the trainer side
For a detailed explanation, refer to this document -
[Design Doc: Parameter Server](./parameter_server.md)
58 59 60

## Distributed Training Architecture

61
The revamped distributed training architecture can address the above discussed limitations. Below is the illustration of how it does so:
62

_青葱's avatar
_青葱 已提交
63
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/distributed_architecture.png"/>
64

T
typhoonzero 已提交
65
The major components are: *Python API*, *Distribute Transpiler* and *Remote Executor*.
66

T
typhoonzero 已提交
67
### Python API
68

T
typhoonzero 已提交
69
Python API is the Python library that user's Python code invokes, to read the data, build the neural network topology, and start training, etc.
70 71

```Python
T
typhoonzero 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
...
predict = fluid.layers.fc(input=conv_pool_2, size=10, act="softmax")
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
optimizer = fluid.optimizer.Adam(learning_rate=0.01)
optimizer.minimize(avg_cost)

train_reader = paddle.batch(
    paddle.reader.shuffle(
        paddle.dataset.mnist.train(), buf_size=500),
    batch_size=BATCH_SIZE)

place = fluid.CPUPlace()
exe = fluid.Executor(place)

for pass_id in range(10):
    for data in train_reader():
        loss, acc = exe.run(trainer_prog,
                            feed=feeder.feed(data),
                            fetch_list=[avg_cost])
94 95
```

T
typhoonzero 已提交
96 97
The code above is a typical local training program, the "Training Program" is built using helper functions such as
`fluid.layer.fc`. The training is done by calling `Executor.run`
98 99
iteratively.

T
typhoonzero 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
For more details, the implementation of IR is [Program](../program.md), and `ProgramDesc` is the protobuf type.

[Executor](../executor.md) simply runs the `ProgramDesc`. For local training you generally use
`Executor` to run the program locally. For any kind of distributed training, you can use
`RemoteExecutor` to specify desired distributed training method with some optional arguments.

### Distributed Transpiler

The Distributed Transpiler automatically converts the IR (in protobuf format) to partitioned IRs. Then
the Remote Executor dispatches the new IRs to Remote Executors across the cluster.
Below are the steps that are followed :

1. User only need to change `Executor` to `RemoteExecutor` to change local program to distributed program.
1. `RemoteExecutor` calls `Distributed Transpiler` to "transpile" user's program to several IRs representing a
   distributed training program:
   1. Parse configurations from `RemoteExecutor`.
   1. Determine the type of distributed program, can be DataParallelism, ModelParallelism or Streaming.
T
typhoonzero 已提交
117
   1. Partition the `ProgramDesc` according to type and add `send` / `recv` OP pair on the boundaries. Take
T
typhoonzero 已提交
118 119 120 121
      DataParallelism type for example, it removes the optimization operators and add a `send` OP to the
      "trainer" role, then add the optimization operators to the parameter server role within the `recv` OP.
1. Dispatch the partitioned graph to different `RemoteExecutor` in the cluster.
1. `RemoteExecutor` on each node run the received `ProgramDesc` utill the end.
T
typhoonzero 已提交
122

T
typhoonzero 已提交
123 124 125 126 127

### RemoteExecutor

As shown in the graph, `RemoteExecutor.run` sends the IR to the cluster for Execution.
You can also use parameter `fetch_list` to interactively fetch variable back to local for
T
typhoonzero 已提交
128 129 130
log printing.

The Python `RemoteExecutor` is derived from `Executor` class.
T
typhoonzero 已提交
131 132

```python
T
typhoonzero 已提交
133 134 135
exe = RemoteExecutor(
    feed=feeder.feed(data),
    fetch_list=[avg_cost],
T
typhoonzero 已提交
136 137 138 139 140 141 142 143 144 145
    job_desc=JobDesc(
      jobname,
      num_trainer,
      num_pserver,
      cpu_per_trainer,
      gpu_per_trainer,
      mem_per_trainer,
      cpu_per_pserver,
      mem_per_pserver
    ))
T
typhoonzero 已提交
146 147 148 149
for data in train_reader():
    loss, acc = exe.run(trainer_prog,
                        feed=feeder.feed(data),
                        fetch_list=[avg_cost])
T
typhoonzero 已提交
150 151 152 153 154
```

`JobDesc` object describe the distributed job resource specification to run on
Cluster environment.

_青葱's avatar
_青葱 已提交
155
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/remote_executor.png" width="500" align="center" />
156

T
typhoonzero 已提交
157
`RemoteExecutor.run` sends the `ProgramDesc` and
T
typhoonzero 已提交
158
[TrainingJob](https://github.com/PaddlePaddle/cloud/blob/unreleased-tpr/doc/autoscale/README.md#training-job-resource)
T
typhoonzero 已提交
159
to a server in the cluster which executes `RemoteExecutor.listen`. This server is responsible
G
gongweibao 已提交
160
to start the final Kubernetes Jobs to run the different role of `ProgramDesc` from `ConfigMap`.
161 162


T
typhoonzero 已提交
163
### Placement Algorithm
164

165
Our first implementation will only support "trainer-parameter server" placement: the parameters, initializers, and optimizers are all placed on the PaddlePaddle runtimes with the parameter server role. Everything else will be placed on the PaddlePaddle runtimes with the trainer role. This has the same functionality as the "trainer-parameter server" architecture of PaddlePaddle v0.10.0, but is more generic and flexible.
166

167
In the future, a more general placement algorithm should be implemented, which makes placements according to the input IR, and a model of device computation time and device communication time. Model parallelism requires the generic placement algorithm.
168 169 170 171


### Local Training Architecture

172
The local training architecture will be the same as the distributed training architecture, the difference is that everything runs locally, and there is just one PaddlePaddle runtime:
173

_青葱's avatar
_青葱 已提交
174
<img src="https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/doc/fluid/images/local_architecture.png"/>
175 176 177 178 179 180 181 182 183 184 185 186 187


### Training Data

In PaddlePaddle v0.10.0, training data is typically read
with [data reader](../reader/README.md) from Python. This approach is
no longer efficient when training distributedly since the Python
process no longer runs on the same node with the trainer processes,
the Python reader will need to read from the distributed filesystem
(assuming it has the access) and send to the trainers, doubling the
network traffic.

When doing distributed training, the user can still use Python data
T
typhoonzero 已提交
188
reader: the training data are sent with `Executor.run`. However, should
189 190 191 192 193 194 195 196 197
be used for debugging purpose only. The users are encouraged to use
the read data OPs.


## References:

[1] [TensorFlow: Large-Scale Machine Learning on Heterogeneous Distributed Systems](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/45166.pdf)

[2] [TensorFlow: A System for Large-Scale Machine Learning](https://www.usenix.org/system/files/conference/osdi16/osdi16-abadi.pdf)