README.md 9.9 KB
Newer Older
H
Helin Wang 已提交
1 2 3 4
# Distributed Training Design Doc

## Objective

5
We want Paddle to support training on the general-purpose cluster. The cluster runs Paddle, the web server (e.g., Nginx), the log collector (e.g., fluentd), the distributed queue service (e.g., Kafka), the log joiner and other data processors written using Storm, Spark, and Hadoop MapReduce on the same cluster. As illustrated in the following graph:
H
Helin Wang 已提交
6

H
Helin Wang 已提交
7
<img src="src/arch.png"/>
H
Helin Wang 已提交
8 9 10

This poses new challenges for Paddle,

H
Helin Wang 已提交
11
- Paddle need to be fault tolerant.
H
Helin Wang 已提交
12
- Input training data can be online data from real time logs or batch data from distributed file system.
H
Helin Wang 已提交
13
- User needs a simple way to train model on Paddle cloud. Complexities such as job scheduling should be hidden from user.
H
Helin Wang 已提交
14 15 16

## Training Job

H
Helin Wang 已提交
17
A training job will be created once user asks Paddle cloud to train a model. The training job is made up of different processes that collaboratively consume data and produce a trained model. There are three kinds of processes:
H
Helin Wang 已提交
18 19 20 21 22

- Master process
- Trainer process
- Parameter server process

H
Helin Wang 已提交
23
One training job will only have one master process, typically multiple trainer processes and parameter server processes. Their relation is illustrated in the following graph:
H
Helin Wang 已提交
24

25
<img src="src/paddle-model-sharding.png" height="400"/>
H
Helin Wang 已提交
26 27 28

### Master Process

29
The master process will:
H
Helin Wang 已提交
30

31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
- Do [parameter server selection](#parameter-server-selection).
- Shard dataset into [tasks](#task) and dispatch tasks to trainers.
- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass.

Now we will explain the concepts mentioned above:

#### Selection Request

The selection request is a request that the master sends to a parameter server candidate, making it a parameter server available to the trainers. It contains information such as the parameter server index, the optimizaiton algorithm, the parameter save period, and the path for saving parameters.

#### Parameter Server Selection

The parameter server selection process selects parameter servers from parameter server candidates. It ensures the parameter servers that the trainers see are in consistent order, since the trainer needs to decide the parameter placement according to the consistent order.

The selection process is as follows:

- The master watches `/ps_candidate/` prefix in etcd. When a parameter server candidate joins and there is not enough parameter servers, the master will remove the candidate's entry in `/ps_candidate/` and send a [selection reqeust](#selection-request) to the candidate. Upon receiving the request, the candidate will set key `/ps/<index>` in etcd with a lease to make itself available for the trainers. The `<index>` is from the selection request.

	The graph below shows a parameter server candidate come online and then being selected, available for the trainers:
	
	<img src="src/paddle-ps-0-can.png" width="650"/>
	<img src="src/paddle-ps-0-sel.png" width="650"/>

- The master watches `/ps/` prefix in etcd. When a selected parameter server went offline, the master will select a not yet selected parameter server candidate by sending the selection request to fill the missing parameter server spot.

	The graph below shows one parameter server is missing, the cluster management system created a new parameter server. The new parameter server announced itself as a candidate. Then the master filled the missing parameter server spot with the new candidate.

	<img src="src/paddle-ps-new-can.png" width="650"/>
	<img src="src/paddle-ps-new-sel.png" width="650"/>


#### Task 

A task is a piece of sharded data to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size.
H
Helin Wang 已提交
65 66 67

#### Task Queue

68
Master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues.
H
Helin Wang 已提交
69

70
<img src="src/paddle-task-queues.png" height="400"/>
H
Helin Wang 已提交
71

H
Helin Wang 已提交
72
- The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks.
73
- The pending queue holds tasks that are currently training by trainers.
H
Helin Wang 已提交
74
- the done queue holds tasks that are already trained.
H
Helin Wang 已提交
75

76
The life cycle of a single task is illustrated below:
H
Helin Wang 已提交
77

78
<img src="src/paddle-task-states.png" height="400"/>
H
Helin Wang 已提交
79 80

1. When a new pass of training starts, all tasks will be placed in the todo queue.
H
Helin Wang 已提交
81 82
1. The master process will dispatch few tasks to each trainer at a time, puts them in the pending queue and waits for completion.
1. The trainer will work on it's tasks and tell the master process once a task is completed. The master process will dispatch a new task to that trainer.
83 84
1. If a task timeout. the master process will move it back to the todo queue. The timeout count will increase by one. If the timeout count is above an threashold, the task is likely to cause a trainer to crash, so it will be discarded.
1. The master process will move completed task to the done queue. When the todo queue is empty, the master process will start a new pass by moving all tasks in the done queue to todo queue and resetting the timeout counter of all tasks to zero.
H
Helin Wang 已提交
85 86 87

### Trainer Process

88
The trainer process will:
H
Helin Wang 已提交
89

90 91
- Receive the tasks from the master.
- Work on the tasks: alculate and upload gradient to the parameter servers, and update local model by downloading new parameters from the parameter servers.
H
Helin Wang 已提交
92

93
### Parameter Server Process
H
Helin Wang 已提交
94

95
Parameter server processes hold the parameters collabratively. The parameters are sharded on different parameter servers.
H
Helin Wang 已提交
96

97 98 99 100 101 102 103 104 105 106 107 108 109 110
The parameter server will:

- Receive gradient from the trainers, update its parameters, and give the trainers the latest parameters.
- Periodically save its parameters to distributed file system by overriding the previous save.

### Optimization Algorithms

The communication pattern between the trainers and the parameter servers depends on the category of optimization algorithm:

- Synchronous Stochastic Gradient Decent (sync-SGD)

	Parameter server will wait for all trainer finish n-th mini-batch calculation and send their gradients before broadcasting new parameters to every trainer. Every trainer will wait for the new parameters before starting n+1-th mini-batch.
  
- Asynchronous Stochastic Gradient Decent (async-SGD)
H
Helin Wang 已提交
111

112 113 114 115 116
	There will no synchronization between different trainers, and parameter server updates its parameter as soon as it receives new gradient:

	- Each trainer uploads its accumulated gradient every **n** mini-batches.
	- Every **m** mini-batches, the trainer downloads new parameters from parameter server.
	- **n** and **m** do not have to be equal.
H
Helin Wang 已提交
117 118 119

## Fault Tolerant

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
The training job will pause if the master process is dead, or any of the parameter server process is dead. They will be started by the cluster management system and recover in few minutes. Please refer to [fault recovery](#fault-recovery).

The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm:

- sync-SGD

	TODO

- async-SGD

	Since async-SGD does not require synchronization between mini-batches, the system will by definition make process if at least one trainer is running.

## Fault Recovery

PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameter are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file.

Now we will introduce how each process recovers from failure:

<img src="src/paddle-etcd.png" width="650"/>

### Master Process

When the master is started by the cluster management system, it executes the following steps at startup:

1. Grabs a unique *master* lock in etcd, which prevents concurrent master instantiations.
1. Recovers the task queues from etcd if they already exists, otherwise the master will create them.
1. Watches the trainer prefix keys `/trainer/` on etcd to find the live trainers.
1. Starts dispatching the tasks to the trainers.

The master process will kill itself if its etcd lease expires.

When the master process is dead for any reason, the cluster management system will restart it. It will be online again with all states recovered from etcd in few minutes.

### Trainer Process

When the trainer is started by the cluster management system, it executes the following steps at startup:

1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until count of parameter servers reaches the desired count.
1. Generates an unique ID, and sets key `/trainer/<unique ID>` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline.
1. Waits for tasks from the master to start training.

If trainer's etcd lease expires, it will try set key `/trainer/<unique ID>` again so that the master process can discover the trainer again.

### Parameter Server Process

When the parameter server is started by the cluster management system, it executes the following steps at startup:

1. Generates an unique ID, and sets key `/ps_candidate/<unique ID>` with its contact address as value and waits for the master's [selection request](#selection-request)
1. Upon receiving master server's [selection request](#selection-request). The parameter server can load parameters if there are already saved parameters in the save path from selection request. Then Creates key `/ps/<index>` with its contact address as value.
1. Now the parameter server is ready for the trainers' requests.

If the parameter server's etcd lease expires, the parameter server will save its parameters to the given save path and kill itself.


## Dynamic Scaling

### Trainer Scaling

TODO

### Parameter Server Scaling

Not planned for v1.

## Training Dataset Format

H
Helin Wang 已提交
186 187 188 189 190
TODO

## User Interface

TODO