diff --git a/doc/design/dist/README.md b/doc/design/dist/README.md index 76924f7e1d335a3b00e6a588ed2fdacffe39425c..8bb1ac850413284ab44feffa4084f9165f40aae7 100644 --- a/doc/design/dist/README.md +++ b/doc/design/dist/README.md @@ -2,7 +2,7 @@ ## Objective -We want Paddle to support training on the general-purpose cluster. The cluster runs Paddle, the Web server (e.g., Nginx), the log collector (e.g., fluentd), the distributed queue service (e.g., Kafka), the log joiner and other data processors written using Storm, Spark, and Hadoop MapReduce on the same cluster. As illustrated in the following graph: +We want Paddle to support training on the general-purpose cluster. The cluster runs Paddle, the web server (e.g., Nginx), the log collector (e.g., fluentd), the distributed queue service (e.g., Kafka), the log joiner and other data processors written using Storm, Spark, and Hadoop MapReduce on the same cluster. As illustrated in the following graph: @@ -22,55 +22,167 @@ A training job will be created once user asks Paddle cloud to train a model. The One training job will only have one master process, typically multiple trainer processes and parameter server processes. Their relation is illustrated in the following graph: - + ### Master Process -Master process will: +The master process will: -- Keep a list of alive trainers and a list of alive parameter servers and do health check. - - If a trainer is dead it will update the task queue accordingly as mentioned in [task queue](#task-queue). - - If a parameter server is dead or a new parameter server joins, it will broadcast this information to all trainers. -- Dispatches tasks to trainers. A *task* is a unit of data that a trainer needs to train on. -- Keep track of training progress on the dataset with *task queue*. A training job will iterate on the dataset for a full pass until it goes into next pass. +- Do [parameter server selection](#parameter-server-selection). +- Shard dataset into [tasks](#task) and dispatch tasks to trainers. +- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass. + +Now we will explain the concepts mentioned above: + +#### Selection Request + +The selection request is a request that the master sends to a parameter server candidate, making it a parameter server available to the trainers. It contains information such as the parameter server index, the optimizaiton algorithm, the parameter save period, and the path for saving parameters. + +#### Parameter Server Selection + +The parameter server selection process selects parameter servers from parameter server candidates. It ensures the parameter servers that the trainers see are in consistent order, since the trainer needs to decide the parameter placement according to the consistent order. + +The selection process is as follows: + +- The master watches `/ps_candidate/` prefix in etcd. When a parameter server candidate joins and there is not enough parameter servers, the master will remove the candidate's entry in `/ps_candidate/` and send a [selection reqeust](#selection-request) to the candidate. Upon receiving the request, the candidate will set key `/ps/` in etcd with a lease to make itself available for the trainers. The `` is from the selection request. + + The graph below shows a parameter server candidate come online and then being selected, available for the trainers: + + + + +- The master watches `/ps/` prefix in etcd. When a selected parameter server went offline, the master will select a not yet selected parameter server candidate by sending the selection request to fill the missing parameter server spot. + + The graph below shows one parameter server is missing, the cluster management system created a new parameter server. The new parameter server announced itself as a candidate. Then the master filled the missing parameter server spot with the new candidate. + + + + + +#### Task + +A task is a piece of sharded data to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size. #### Task Queue -Master process has three task queues to track training progress as shown in the graph below: +Master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues. - + - The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks. -- The pending queue holds tasks that are currently training by trainers, and a mapping from trainers to their training tasks. +- The pending queue holds tasks that are currently training by trainers. - the done queue holds tasks that are already trained. -A dataset will be sharded into tasks and dispatched by the master process. The life cycle of a single task is illustrated below: +The life cycle of a single task is illustrated below: - + 1. When a new pass of training starts, all tasks will be placed in the todo queue. 1. The master process will dispatch few tasks to each trainer at a time, puts them in the pending queue and waits for completion. 1. The trainer will work on it's tasks and tell the master process once a task is completed. The master process will dispatch a new task to that trainer. -1. If a trainer is dead. the master process will move it's tasks back to the todo queue. -1. The master process will move completed task to the done queue. When the todo queue is empty, the master process will start a new pass by moving all tasks in the done queue to todo queue. +1. If a task timeout. the master process will move it back to the todo queue. The timeout count will increase by one. If the timeout count is above an threashold, the task is likely to cause a trainer to crash, so it will be discarded. +1. The master process will move completed task to the done queue. When the todo queue is empty, the master process will start a new pass by moving all tasks in the done queue to todo queue and resetting the timeout counter of all tasks to zero. ### Trainer Process -The trainer process will train its current tasks, tell parameter servers its accumulated gradient, and download the latest model from parameter servers. +The trainer process will: -The trainer holds entire network model while each parameter server holds a shard of the model. So trainer needs to communicate will all parameter servers. +- Receive the tasks from the master. +- Work on the tasks: alculate and upload gradient to the parameter servers, and update local model by downloading new parameters from the parameter servers. -Communication involves two parts: +### Parameter Server Process -- Upload accumulated gradient. Upload can be configured to happen every **n** mini-batches. -- Download new model. Download can be configured to happen every **m** mini-batches. **n** and **m** do not have to be equal. +Parameter server processes hold the parameters collabratively. The parameters are sharded on different parameter servers. -### Parameter Server Process +The parameter server will: + +- Receive gradient from the trainers, update its parameters, and give the trainers the latest parameters. +- Periodically save its parameters to distributed file system by overriding the previous save. + +### Optimization Algorithms + +The communication pattern between the trainers and the parameter servers depends on the category of optimization algorithm: + +- Synchronous Stochastic Gradient Decent (sync-SGD) + + Parameter server will wait for all trainer finish n-th mini-batch calculation and send their gradients before broadcasting new parameters to every trainer. Every trainer will wait for the new parameters before starting n+1-th mini-batch. + +- Asynchronous Stochastic Gradient Decent (async-SGD) -Parameter server processes hold model together. Since model parameters are sharded and saved on different parameter servers. All parameter servers collabratively form the global view of a trained model. + There will no synchronization between different trainers, and parameter server updates its parameter as soon as it receives new gradient: + + - Each trainer uploads its accumulated gradient every **n** mini-batches. + - Every **m** mini-batches, the trainer downloads new parameters from parameter server. + - **n** and **m** do not have to be equal. ## Fault Tolerant +The training job will pause if the master process is dead, or any of the parameter server process is dead. They will be started by the cluster management system and recover in few minutes. Please refer to [fault recovery](#fault-recovery). + +The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm: + +- sync-SGD + + TODO + +- async-SGD + + Since async-SGD does not require synchronization between mini-batches, the system will by definition make process if at least one trainer is running. + +## Fault Recovery + +PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameter are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file. + +Now we will introduce how each process recovers from failure: + + + +### Master Process + +When the master is started by the cluster management system, it executes the following steps at startup: + +1. Grabs a unique *master* lock in etcd, which prevents concurrent master instantiations. +1. Recovers the task queues from etcd if they already exists, otherwise the master will create them. +1. Watches the trainer prefix keys `/trainer/` on etcd to find the live trainers. +1. Starts dispatching the tasks to the trainers. + +The master process will kill itself if its etcd lease expires. + +When the master process is dead for any reason, the cluster management system will restart it. It will be online again with all states recovered from etcd in few minutes. + +### Trainer Process + +When the trainer is started by the cluster management system, it executes the following steps at startup: + +1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until count of parameter servers reaches the desired count. +1. Generates an unique ID, and sets key `/trainer/` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline. +1. Waits for tasks from the master to start training. + +If trainer's etcd lease expires, it will try set key `/trainer/` again so that the master process can discover the trainer again. + +### Parameter Server Process + +When the parameter server is started by the cluster management system, it executes the following steps at startup: + +1. Generates an unique ID, and sets key `/ps_candidate/` with its contact address as value and waits for the master's [selection request](#selection-request) +1. Upon receiving master server's [selection request](#selection-request). The parameter server can load parameters if there are already saved parameters in the save path from selection request. Then Creates key `/ps/` with its contact address as value. +1. Now the parameter server is ready for the trainers' requests. + +If the parameter server's etcd lease expires, the parameter server will save its parameters to the given save path and kill itself. + + +## Dynamic Scaling + +### Trainer Scaling + +TODO + +### Parameter Server Scaling + +Not planned for v1. + +## Training Dataset Format + TODO ## User Interface diff --git a/doc/design/dist/src/paddle-etcd.graffle b/doc/design/dist/src/paddle-etcd.graffle new file mode 100644 index 0000000000000000000000000000000000000000..501bb8f98f4c3fb072b63498b0b35d2e5e6ff821 Binary files /dev/null and b/doc/design/dist/src/paddle-etcd.graffle differ diff --git a/doc/design/dist/src/paddle-etcd.png b/doc/design/dist/src/paddle-etcd.png new file mode 100644 index 0000000000000000000000000000000000000000..0e9582ded16d2ecc746ef3225c3f3ebb5d18087d Binary files /dev/null and b/doc/design/dist/src/paddle-etcd.png differ diff --git a/doc/design/dist/src/paddle-model-sharding.graffle b/doc/design/dist/src/paddle-model-sharding.graffle new file mode 100644 index 0000000000000000000000000000000000000000..fba30f0ca2b47f0d202a432821d95e55aac37ec8 Binary files /dev/null and b/doc/design/dist/src/paddle-model-sharding.graffle differ diff --git a/doc/design/dist/src/paddle-model-sharding.png b/doc/design/dist/src/paddle-model-sharding.png new file mode 100644 index 0000000000000000000000000000000000000000..8c3f6724ef46c6527e63a4cd8cb0b50fe0167124 Binary files /dev/null and b/doc/design/dist/src/paddle-model-sharding.png differ diff --git a/doc/design/dist/src/paddle-on-kubernetes-invited-blog-model-sharding.graffle b/doc/design/dist/src/paddle-on-kubernetes-invited-blog-model-sharding.graffle deleted file mode 100644 index 4a0e7d9951ca24caa4983cd8cfe4aa4fea627020..0000000000000000000000000000000000000000 Binary files a/doc/design/dist/src/paddle-on-kubernetes-invited-blog-model-sharding.graffle and /dev/null differ diff --git a/doc/design/dist/src/paddle-on-kubernetes-invited-blog-model-sharding.png b/doc/design/dist/src/paddle-on-kubernetes-invited-blog-model-sharding.png deleted file mode 100644 index 6da08ebca645e776afd80e5c045bb6ea96d87135..0000000000000000000000000000000000000000 Binary files a/doc/design/dist/src/paddle-on-kubernetes-invited-blog-model-sharding.png and /dev/null differ diff --git a/doc/design/dist/src/paddle-ps-0-can.png b/doc/design/dist/src/paddle-ps-0-can.png new file mode 100644 index 0000000000000000000000000000000000000000..064469c5fc6ecee9cc7c0814974795ccaac5139f Binary files /dev/null and b/doc/design/dist/src/paddle-ps-0-can.png differ diff --git a/doc/design/dist/src/paddle-ps-0-sel.png b/doc/design/dist/src/paddle-ps-0-sel.png new file mode 100644 index 0000000000000000000000000000000000000000..fe771850956e3f13089517a04c5077e2a587fa06 Binary files /dev/null and b/doc/design/dist/src/paddle-ps-0-sel.png differ diff --git a/doc/design/dist/src/paddle-ps-etcd.graffle b/doc/design/dist/src/paddle-ps-etcd.graffle new file mode 100644 index 0000000000000000000000000000000000000000..19705a5cdbc1f87b48dc5bffa5ad4d056b94b3d4 Binary files /dev/null and b/doc/design/dist/src/paddle-ps-etcd.graffle differ diff --git a/doc/design/dist/src/paddle-ps-new-can.png b/doc/design/dist/src/paddle-ps-new-can.png new file mode 100644 index 0000000000000000000000000000000000000000..2e6c4d1eca4f6266fd15f30a2765850be6bef151 Binary files /dev/null and b/doc/design/dist/src/paddle-ps-new-can.png differ diff --git a/doc/design/dist/src/paddle-ps-new-sel.png b/doc/design/dist/src/paddle-ps-new-sel.png new file mode 100644 index 0000000000000000000000000000000000000000..65d06e21f9774e71ac92462173cb41f65d44f99e Binary files /dev/null and b/doc/design/dist/src/paddle-ps-new-sel.png differ diff --git a/doc/design/dist/src/paddle-task-queues.graffle b/doc/design/dist/src/paddle-task-queues.graffle index bd109a380523b8964458a7ef03ee5b09ad535475..957d1c7340275dc306b35541a319f9af5307e231 100644 Binary files a/doc/design/dist/src/paddle-task-queues.graffle and b/doc/design/dist/src/paddle-task-queues.graffle differ diff --git a/doc/design/dist/src/paddle-task-queues.png b/doc/design/dist/src/paddle-task-queues.png index fc3f5cd33db580cf87b3fd80c575ebb05d20da80..995f5819d4f1311b806def8410cdeb943408844f 100644 Binary files a/doc/design/dist/src/paddle-task-queues.png and b/doc/design/dist/src/paddle-task-queues.png differ diff --git a/doc/design/dist/src/paddle-task-states.graffle b/doc/design/dist/src/paddle-task-states.graffle index 7034115ca8ed08ad768d19940036d93250a9c319..cf1a0b9246d9386a949d2dbb8c32fe84f72eea83 100644 Binary files a/doc/design/dist/src/paddle-task-states.graffle and b/doc/design/dist/src/paddle-task-states.graffle differ diff --git a/doc/design/dist/src/paddle-task-states.png b/doc/design/dist/src/paddle-task-states.png index b647a503bb867f19d72ff1f522b35b59b246cc40..4ae43cb66c071aee9eb90d875e2373b29af9c3e0 100644 Binary files a/doc/design/dist/src/paddle-task-states.png and b/doc/design/dist/src/paddle-task-states.png differ