diff --git a/doc/design/dist/README.md b/doc/design/dist/README.md index 8bb1ac850413284ab44feffa4084f9165f40aae7..321398ce160322401478f4c78f5c6ab4c4e33f17 100644 --- a/doc/design/dist/README.md +++ b/doc/design/dist/README.md @@ -1,4 +1,4 @@ -# Distributed Training Design Doc +# Design Doc: Distributed Training ## Objective @@ -22,43 +22,17 @@ A training job will be created once user asks Paddle cloud to train a model. The One training job will only have one master process, typically multiple trainer processes and parameter server processes. Their relation is illustrated in the following graph: - + ### Master Process The master process will: -- Do [parameter server selection](#parameter-server-selection). - Shard dataset into [tasks](#task) and dispatch tasks to trainers. - Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass. Now we will explain the concepts mentioned above: -#### Selection Request - -The selection request is a request that the master sends to a parameter server candidate, making it a parameter server available to the trainers. It contains information such as the parameter server index, the optimizaiton algorithm, the parameter save period, and the path for saving parameters. - -#### Parameter Server Selection - -The parameter server selection process selects parameter servers from parameter server candidates. It ensures the parameter servers that the trainers see are in consistent order, since the trainer needs to decide the parameter placement according to the consistent order. - -The selection process is as follows: - -- The master watches `/ps_candidate/` prefix in etcd. When a parameter server candidate joins and there is not enough parameter servers, the master will remove the candidate's entry in `/ps_candidate/` and send a [selection reqeust](#selection-request) to the candidate. Upon receiving the request, the candidate will set key `/ps/` in etcd with a lease to make itself available for the trainers. The `` is from the selection request. - - The graph below shows a parameter server candidate come online and then being selected, available for the trainers: - - - - -- The master watches `/ps/` prefix in etcd. When a selected parameter server went offline, the master will select a not yet selected parameter server candidate by sending the selection request to fill the missing parameter server spot. - - The graph below shows one parameter server is missing, the cluster management system created a new parameter server. The new parameter server announced itself as a candidate. Then the master filled the missing parameter server spot with the new candidate. - - - - - #### Task A task is a piece of sharded data to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size. @@ -67,7 +41,7 @@ A task is a piece of sharded data to be trained. The total number of tasks will Master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues. - + - The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks. - The pending queue holds tasks that are currently training by trainers. @@ -75,7 +49,7 @@ Master process has three task queues to track training progress. As illustrated The life cycle of a single task is illustrated below: - + 1. When a new pass of training starts, all tasks will be placed in the todo queue. 1. The master process will dispatch few tasks to each trainer at a time, puts them in the pending queue and waits for completion. @@ -117,7 +91,7 @@ The communication pattern between the trainers and the parameter servers depends ## Fault Tolerant -The training job will pause if the master process is dead, or any of the parameter server process is dead. They will be started by the cluster management system and recover in few minutes. Please refer to [fault recovery](#fault-recovery). +The training job will pause if the master process is dead, or any of the parameter server process is dead. They will be started by [Kubernetes](https://kubernetes.io/) and recover in few minutes. Please refer to [fault recovery](#fault-recovery). The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm: @@ -133,13 +107,13 @@ The training job will continue to make progress if there is at least one trainin PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameter are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file. -Now we will introduce how each process recovers from failure: +Now we will introduce how each process recovers from failure, the graph below provides an illustration: - + ### Master Process -When the master is started by the cluster management system, it executes the following steps at startup: +When the master is started by the Kubernetes, it executes the following steps at startup: 1. Grabs a unique *master* lock in etcd, which prevents concurrent master instantiations. 1. Recovers the task queues from etcd if they already exists, otherwise the master will create them. @@ -148,11 +122,11 @@ When the master is started by the cluster management system, it executes the fol The master process will kill itself if its etcd lease expires. -When the master process is dead for any reason, the cluster management system will restart it. It will be online again with all states recovered from etcd in few minutes. +When the master process is dead for any reason, Kubernetes will restart it. It will be online again with all states recovered from etcd in few minutes. ### Trainer Process -When the trainer is started by the cluster management system, it executes the following steps at startup: +When the trainer is started by the Kubernetes, it executes the following steps at startup: 1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until count of parameter servers reaches the desired count. 1. Generates an unique ID, and sets key `/trainer/` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline. @@ -162,13 +136,23 @@ If trainer's etcd lease expires, it will try set key `/trainer/` agai ### Parameter Server Process -When the parameter server is started by the cluster management system, it executes the following steps at startup: +When the parameter server is started by Kubernetes, it executes the following steps at startup: + +1. Read desired total number of parameter servers from etcd `/ps_desired` +1. Search though etcd keys `/ps/` (`/ps/0`, `/ps/1`, ...) to find the first non-existant key whose index is smaller than the total number of parameter servers. Set the key using a transaction to avoid concurrent writes. The parameter server's index is inferred from the key name. + + The desired number of parameter servers is 3: + + + + The third parameter server joined: + + -1. Generates an unique ID, and sets key `/ps_candidate/` with its contact address as value and waits for the master's [selection request](#selection-request) -1. Upon receiving master server's [selection request](#selection-request). The parameter server can load parameters if there are already saved parameters in the save path from selection request. Then Creates key `/ps/` with its contact address as value. +1. The parameter server can load parameters if there are already saved parameters in the save path (inferred from its index). 1. Now the parameter server is ready for the trainers' requests. -If the parameter server's etcd lease expires, the parameter server will save its parameters to the given save path and kill itself. +If the parameter server's etcd lease expires, the parameter server will kill itself. ## Dynamic Scaling diff --git a/doc/design/dist/src/arch.png b/doc/design/dist/src/arch.png index f75718a194090944707618c5c21e1a3967c6ba9b..30b5c34df2abd052ee9be7d8edd4a4dd111e501c 100644 Binary files a/doc/design/dist/src/arch.png and b/doc/design/dist/src/arch.png differ diff --git a/doc/design/dist/src/paddle-etcd.graffle b/doc/design/dist/src/paddle-etcd.graffle index 501bb8f98f4c3fb072b63498b0b35d2e5e6ff821..56681ae5bbe11849116d621b066a6317e003e4ca 100644 Binary files a/doc/design/dist/src/paddle-etcd.graffle and b/doc/design/dist/src/paddle-etcd.graffle differ diff --git a/doc/design/dist/src/paddle-etcd.png b/doc/design/dist/src/paddle-etcd.png index 0e9582ded16d2ecc746ef3225c3f3ebb5d18087d..4f9c9762b3a8c089dd5e9b2c07cb9dfc78296a21 100644 Binary files a/doc/design/dist/src/paddle-etcd.png and b/doc/design/dist/src/paddle-etcd.png differ diff --git a/doc/design/dist/src/paddle-ps-0-can.png b/doc/design/dist/src/paddle-ps-0-can.png deleted file mode 100644 index 064469c5fc6ecee9cc7c0814974795ccaac5139f..0000000000000000000000000000000000000000 Binary files a/doc/design/dist/src/paddle-ps-0-can.png and /dev/null differ diff --git a/doc/design/dist/src/paddle-ps-0-sel.png b/doc/design/dist/src/paddle-ps-0-sel.png deleted file mode 100644 index fe771850956e3f13089517a04c5077e2a587fa06..0000000000000000000000000000000000000000 Binary files a/doc/design/dist/src/paddle-ps-0-sel.png and /dev/null differ diff --git a/doc/design/dist/src/paddle-ps-0.png b/doc/design/dist/src/paddle-ps-0.png new file mode 100644 index 0000000000000000000000000000000000000000..0611183c7d177fd662ef4db57a2f10f2d479fd5b Binary files /dev/null and b/doc/design/dist/src/paddle-ps-0.png differ diff --git a/doc/design/dist/src/paddle-ps-1.png b/doc/design/dist/src/paddle-ps-1.png new file mode 100644 index 0000000000000000000000000000000000000000..350e47dfab4b683b4bffdb2f7fcb58f5edb2480a Binary files /dev/null and b/doc/design/dist/src/paddle-ps-1.png differ diff --git a/doc/design/dist/src/paddle-ps-etcd.graffle b/doc/design/dist/src/paddle-ps-etcd.graffle deleted file mode 100644 index 19705a5cdbc1f87b48dc5bffa5ad4d056b94b3d4..0000000000000000000000000000000000000000 Binary files a/doc/design/dist/src/paddle-ps-etcd.graffle and /dev/null differ diff --git a/doc/design/dist/src/paddle-ps-new-can.png b/doc/design/dist/src/paddle-ps-new-can.png deleted file mode 100644 index 2e6c4d1eca4f6266fd15f30a2765850be6bef151..0000000000000000000000000000000000000000 Binary files a/doc/design/dist/src/paddle-ps-new-can.png and /dev/null differ diff --git a/doc/design/dist/src/paddle-ps-new-sel.png b/doc/design/dist/src/paddle-ps-new-sel.png deleted file mode 100644 index 65d06e21f9774e71ac92462173cb41f65d44f99e..0000000000000000000000000000000000000000 Binary files a/doc/design/dist/src/paddle-ps-new-sel.png and /dev/null differ diff --git a/doc/design/dist/src/paddle-ps.graffle b/doc/design/dist/src/paddle-ps.graffle new file mode 100644 index 0000000000000000000000000000000000000000..ebebab84ae9f072f98275475bbb06bd6c520e8b1 Binary files /dev/null and b/doc/design/dist/src/paddle-ps.graffle differ diff --git a/doc/design/dist/src/paddle-task-queues.graffle b/doc/design/dist/src/paddle-task-queues.graffle index 957d1c7340275dc306b35541a319f9af5307e231..4263ed8bfd2ef0e55058828bf23f2fac3595e5fd 100644 Binary files a/doc/design/dist/src/paddle-task-queues.graffle and b/doc/design/dist/src/paddle-task-queues.graffle differ diff --git a/doc/design/dist/src/paddle-task-queues.png b/doc/design/dist/src/paddle-task-queues.png index 995f5819d4f1311b806def8410cdeb943408844f..5f980266795776752cebd0c346b85c4a75a47780 100644 Binary files a/doc/design/dist/src/paddle-task-queues.png and b/doc/design/dist/src/paddle-task-queues.png differ