README.md 9.1 KB
Newer Older
H
Helin Wang 已提交
1
# Design Doc: Distributed Training
H
Helin Wang 已提交
2 3 4

## Objective

H
Helin Wang 已提交
5
In [this slides](https://www.slideshare.net/cxwangyi/paddlepaddle-a-complete-solution-for-businesses), we explained that we'd like PaddlePaddle running on general-purpose clusters like those managed by Kubernetes, so to address demands for AI from both Internet and non-Internet industries.
H
Helin Wang 已提交
6

H
Helin Wang 已提交
7
This poses technical challenges to PaddlePaddle:
H
Helin Wang 已提交
8

H
Helin Wang 已提交
9 10 11
1. Support fault-recovery.
1. Support both offline and online training.
1. [Serverless computing](https://en.wikipedia.org/wiki/Serverless_computing) of distributed training.
H
Helin Wang 已提交
12 13 14 15


## Training Job

H
Helin Wang 已提交
16
A training job will be created once user asks Paddle cloud to train a model. The training job is made up of different processes that collaboratively consume data and produce a trained model. There are three kinds of processes:
H
Helin Wang 已提交
17

H
Helin Wang 已提交
18 19
1. the *master process*, which dispatches tasks to
1. one or more *trainer processes*, which run distributed training and synchronize gradients/models via
T
typhoonzero 已提交
20
1. one or more *parameter server processes*, where each holds a shard of the global model, and receive the uploaded gradients from every *trainer process*, so they can run the optimize functions to update their parameters.
H
Helin Wang 已提交
21

H
Helin Wang 已提交
22
Their relation is illustrated in the following graph:
H
Helin Wang 已提交
23

H
Helin Wang 已提交
24
<img src="src/paddle-model-sharding.png"/>
H
Helin Wang 已提交
25

T
typhoonzero 已提交
26
By coordinating these processes, PaddlePaddle supports use both Synchronize Stochastic Gradient Descent (sync SGD) and Asynchronous Stochastic Gradient Descent (async SGD) to train user-defined neural network topologies.
T
typhoonzero 已提交
27

T
typhoonzero 已提交
28
When training with sync SGD, parameter servers wait for all trainers to finish gradients update and then send the updated parameters to trainers, training can not proceed until the trainer received the updated parameters. This creates a synchronization point between trainers. When training with async SGD, each trainer upload gradient and download new parameters individually, without the synchronization with other trainers. Using asyc SGD will be faster in terms of time per pass, but have more noise in gradient since trainers are likely to have a stale model.
T
typhoonzero 已提交
29

H
Helin Wang 已提交
30 31
### Master Process

32
The master process will:
H
Helin Wang 已提交
33

H
Helin Wang 已提交
34
- Partition a dataset into [tasks](#task) and dispatch tasks to trainers.
35 36 37
- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass.


T
typhoonzero 已提交
38
#### Task
39

H
Helin Wang 已提交
40
A task is a data shard to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size.
H
Helin Wang 已提交
41 42 43

#### Task Queue

H
Helin Wang 已提交
44
The master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues.
H
Helin Wang 已提交
45

H
Helin Wang 已提交
46
<img src="src/paddle-task-queues.png"/>
H
Helin Wang 已提交
47

H
Helin Wang 已提交
48
- The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks.
49
- The pending queue holds tasks that are currently training by trainers.
H
Helin Wang 已提交
50
- the done queue holds tasks that are already trained.
H
Helin Wang 已提交
51

52
The life cycle of a single task is illustrated below:
H
Helin Wang 已提交
53

H
Helin Wang 已提交
54
<img src="src/paddle-task-states.png"/>
H
Helin Wang 已提交
55 56

1. When a new pass of training starts, all tasks will be placed in the todo queue.
H
Helin Wang 已提交
57
1. The master process will dispatch few tasks to each trainer at a time, puts them in the pending queue and waits for completion.
H
Helin Wang 已提交
58 59 60
1. The trainer will work on its tasks and tell the master process once a task is completed. The master process will dispatch a new task to that trainer.
1. If a task timeout. the master process will move it back to the todo queue. The timeout count will increase by one. If the timeout count is above a threshold, the task is likely to cause a trainer to crash, so it will be discarded.
1. The master process will move completed task to the done queue. When the todo queue is empty, the master process will start a new pass by moving all tasks in the done queue to todo queue and reset the timeout counter of all tasks to zero.
H
Helin Wang 已提交
61 62 63

### Trainer Process

64
The trainer process will:
H
Helin Wang 已提交
65

H
Helin Wang 已提交
66 67
- Receive tasks from the master.
- Work on the tasks: calculate and upload gradient to parameter servers, and update local model by downloading new parameters from parameter servers.
H
Helin Wang 已提交
68

69
### Parameter Server Process
H
Helin Wang 已提交
70

H
Helin Wang 已提交
71
Parameter server processes hold the parameters collaboratively. The parameters are partitioned on different parameter servers.
H
Helin Wang 已提交
72

73 74 75 76 77 78 79 80 81
The parameter server will:

- Receive gradient from the trainers, update its parameters, and give the trainers the latest parameters.
- Periodically save its parameters to distributed file system by overriding the previous save.

### Optimization Algorithms

The communication pattern between the trainers and the parameter servers depends on the category of optimization algorithm:

H
Helin Wang 已提交
82
- Synchronous Stochastic Gradient Descent (sync-SGD)
83 84

	Parameter server will wait for all trainer finish n-th mini-batch calculation and send their gradients before broadcasting new parameters to every trainer. Every trainer will wait for the new parameters before starting n+1-th mini-batch.
T
typhoonzero 已提交
85

H
Helin Wang 已提交
86
- Asynchronous Stochastic Gradient Descent (async-SGD)
H
Helin Wang 已提交
87

88 89
	There will no synchronization between different trainers, and parameter server updates its parameter as soon as it receives new gradient:

H
Helin Wang 已提交
90 91 92
	- Each trainer uploads its accumulated gradient every n mini-batches.
	- Every m mini-batches, the trainer downloads new parameters from parameter server.
	- n and m do not have to be equal.
H
Helin Wang 已提交
93 94 95

## Fault Tolerant

H
Helin Wang 已提交
96
The training job will pause if the master processes is dead, or any of the parameter server process is dead. They will be started by [Kubernetes](https://kubernetes.io/) and recover in few minutes. Please refer to [fault recovery](#fault-recovery).
97 98 99 100 101 102 103 104 105 106 107 108 109

The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm:

- sync-SGD

	TODO

- async-SGD

	Since async-SGD does not require synchronization between mini-batches, the system will by definition make process if at least one trainer is running.

## Fault Recovery

H
Helin Wang 已提交
110
PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameters are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file.
111

H
Helin Wang 已提交
112
Now we will introduce how each process recovers from a failure, the graph below shows how etcd is used:
113

H
Helin Wang 已提交
114
<img src="src/paddle-etcd.png"/>
115 116 117

### Master Process

H
Helin Wang 已提交
118
When the master is started by the Kubernetes, it executes the following steps at startup:
119 120

1. Grabs a unique *master* lock in etcd, which prevents concurrent master instantiations.
H
Helin Wang 已提交
121
1. Recovers the task queues from etcd if they already exist, otherwise, the master will create them.
122
1. Watches the trainer prefix keys `/trainer/` on etcd to find the live trainers.
123
1. Starts dispatching the tasks to the trainers, and updates task queue using an etcd transaction to ensure lock is held during the update.
124

H
Helin Wang 已提交
125
When the master process is dead for any reason, Kubernetes will restart it. It will be online again with all states recovered from etcd in few minutes.
126 127 128

### Trainer Process

H
Helin Wang 已提交
129
When the trainer is started by the Kubernetes, it executes the following steps at startup:
130

H
Helin Wang 已提交
131 132
1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until the count of parameter servers reaches the desired count.
1. Generates a unique ID, and sets key `/trainer/<unique ID>` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline.
133 134 135 136
1. Waits for tasks from the master to start training.

If trainer's etcd lease expires, it will try set key `/trainer/<unique ID>` again so that the master process can discover the trainer again.

T
typhoonzero 已提交
137
When a trainer fails, Kuberentes would try to restart it. The recovered trainer would fetch tasks from the TODO queue and go on training.
T
typhoonzero 已提交
138

139 140
### Parameter Server Process

H
Helin Wang 已提交
141 142 143
When the parameter server is started by Kubernetes, it executes the following steps at startup:

1. Read desired total number of parameter servers from etcd `/ps_desired`
H
Helin Wang 已提交
144
1. Search through etcd keys `/ps/<index>` (`/ps/0`, `/ps/1`, ...) to find the first non-existant key whose index is smaller than the total number of parameter servers. Set the key using a transaction to avoid concurrent writes. The parameter server's index is inferred from the key name.
H
Helin Wang 已提交
145 146

	The desired number of parameter servers is 3:
T
typhoonzero 已提交
147

H
Helin Wang 已提交
148
	<img src="src/paddle-ps-0.png"/>
T
typhoonzero 已提交
149

H
Helin Wang 已提交
150
	The third parameter server joined:
T
typhoonzero 已提交
151

H
Helin Wang 已提交
152
	<img src="src/paddle-ps-1.png"/>
153

H
Helin Wang 已提交
154
1. The parameter server can load parameters if there are already saved parameters in the save path (inferred from its index).
155 156
1. Now the parameter server is ready for the trainers' requests.

H
Helin Wang 已提交
157
If the parameter server's etcd lease expires, the parameter server will kill itself.
158 159


T
typhoonzero 已提交
160 161 162 163 164 165 166
## Parameter Server Checkpointing
See [here](./checkpointing.md)

## Store and dispatching trainning data
See [here](./data_dispatch.md)


167 168 169 170 171 172 173 174 175 176 177 178
## Dynamic Scaling

### Trainer Scaling

TODO

### Parameter Server Scaling

Not planned for v1.

## Training Dataset Format

H
Helin Wang 已提交
179 180 181 182 183
TODO

## User Interface

TODO