diff --git a/doc/design/dist/README.md b/doc/design/dist/README.md new file mode 100644 index 0000000000000000000000000000000000000000..1788208bcabca30f66cb1c80e80f6b824c0d9579 --- /dev/null +++ b/doc/design/dist/README.md @@ -0,0 +1,172 @@ +# Design Doc: Distributed Training + +## Objective + +In [this slides](https://www.slideshare.net/cxwangyi/paddlepaddle-a-complete-solution-for-businesses), we explained that we'd like PaddlePaddle running on general-purpose clusters like those managed by Kubernetes, so to address demands for AI from both Internet and non-Internet industries. + +This poses technical challenges to PaddlePaddle: + +1. Support fault-recovery. +1. Support both offline and online training. +1. [Serverless computing](https://en.wikipedia.org/wiki/Serverless_computing) of distributed training. + + +## Training Job + +A training job will be created once user asks Paddle cloud to train a model. The training job is made up of different processes that collaboratively consume data and produce a trained model. There are three kinds of processes: + +1. the *master process*, which dispatches tasks to +1. one or more *trainer processes*, which run distributed training and synchronize gradients/models via +1. one or more *parameter server processes*, where each holds a shard of the global model. + +Their relation is illustrated in the following graph: + + + +### Master Process + +The master process will: + +- Partition a dataset into [tasks](#task) and dispatch tasks to trainers. +- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass. + + +#### Task + +A task is a data shard to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size. + +#### Task Queue + +The master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues. + + + +- The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks. +- The pending queue holds tasks that are currently training by trainers. +- the done queue holds tasks that are already trained. + +The life cycle of a single task is illustrated below: + + + +1. When a new pass of training starts, all tasks will be placed in the todo queue. +1. The master process will dispatch few tasks to each trainer at a time, puts them in the pending queue and waits for completion. +1. The trainer will work on its tasks and tell the master process once a task is completed. The master process will dispatch a new task to that trainer. +1. If a task timeout. the master process will move it back to the todo queue. The timeout count will increase by one. If the timeout count is above a threshold, the task is likely to cause a trainer to crash, so it will be discarded. +1. The master process will move completed task to the done queue. When the todo queue is empty, the master process will start a new pass by moving all tasks in the done queue to todo queue and reset the timeout counter of all tasks to zero. + +### Trainer Process + +The trainer process will: + +- Receive tasks from the master. +- Work on the tasks: calculate and upload gradient to parameter servers, and update local model by downloading new parameters from parameter servers. + +### Parameter Server Process + +Parameter server processes hold the parameters collaboratively. The parameters are partitioned on different parameter servers. + +The parameter server will: + +- Receive gradient from the trainers, update its parameters, and give the trainers the latest parameters. +- Periodically save its parameters to distributed file system by overriding the previous save. + +### Optimization Algorithms + +The communication pattern between the trainers and the parameter servers depends on the category of optimization algorithm: + +- Synchronous Stochastic Gradient Descent (sync-SGD) + + Parameter server will wait for all trainer finish n-th mini-batch calculation and send their gradients before broadcasting new parameters to every trainer. Every trainer will wait for the new parameters before starting n+1-th mini-batch. + +- Asynchronous Stochastic Gradient Descent (async-SGD) + + There will no synchronization between different trainers, and parameter server updates its parameter as soon as it receives new gradient: + + - Each trainer uploads its accumulated gradient every n mini-batches. + - Every m mini-batches, the trainer downloads new parameters from parameter server. + - n and m do not have to be equal. + +## Fault Tolerant + +The training job will pause if the master processes is dead, or any of the parameter server process is dead. They will be started by [Kubernetes](https://kubernetes.io/) and recover in few minutes. Please refer to [fault recovery](#fault-recovery). + +The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm: + +- sync-SGD + + TODO + +- async-SGD + + Since async-SGD does not require synchronization between mini-batches, the system will by definition make process if at least one trainer is running. + +## Fault Recovery + +PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameters are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file. + +Now we will introduce how each process recovers from a failure, the graph below shows how etcd is used: + + + +### Master Process + +When the master is started by the Kubernetes, it executes the following steps at startup: + +1. Grabs a unique *master* lock in etcd, which prevents concurrent master instantiations. +1. Recovers the task queues from etcd if they already exist, otherwise, the master will create them. +1. Watches the trainer prefix keys `/trainer/` on etcd to find the live trainers. +1. Starts dispatching the tasks to the trainers, and updates task queue using an etcd transaction to ensure lock is held during the update. + +The master process will kill itself if its etcd lease expires. + +When the master process is dead for any reason, Kubernetes will restart it. It will be online again with all states recovered from etcd in few minutes. + +### Trainer Process + +When the trainer is started by the Kubernetes, it executes the following steps at startup: + +1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until the count of parameter servers reaches the desired count. +1. Generates a unique ID, and sets key `/trainer/` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline. +1. Waits for tasks from the master to start training. + +If trainer's etcd lease expires, it will try set key `/trainer/` again so that the master process can discover the trainer again. + +### Parameter Server Process + +When the parameter server is started by Kubernetes, it executes the following steps at startup: + +1. Read desired total number of parameter servers from etcd `/ps_desired` +1. Search through etcd keys `/ps/` (`/ps/0`, `/ps/1`, ...) to find the first non-existant key whose index is smaller than the total number of parameter servers. Set the key using a transaction to avoid concurrent writes. The parameter server's index is inferred from the key name. + + The desired number of parameter servers is 3: + + + + The third parameter server joined: + + + +1. The parameter server can load parameters if there are already saved parameters in the save path (inferred from its index). +1. Now the parameter server is ready for the trainers' requests. + +If the parameter server's etcd lease expires, the parameter server will kill itself. + + +## Dynamic Scaling + +### Trainer Scaling + +TODO + +### Parameter Server Scaling + +Not planned for v1. + +## Training Dataset Format + +TODO + +## User Interface + +TODO diff --git a/doc/design/dist/src/paddle-etcd.graffle b/doc/design/dist/src/paddle-etcd.graffle new file mode 100644 index 0000000000000000000000000000000000000000..56681ae5bbe11849116d621b066a6317e003e4ca Binary files /dev/null and b/doc/design/dist/src/paddle-etcd.graffle differ diff --git a/doc/design/dist/src/paddle-etcd.png b/doc/design/dist/src/paddle-etcd.png new file mode 100644 index 0000000000000000000000000000000000000000..4f9c9762b3a8c089dd5e9b2c07cb9dfc78296a21 Binary files /dev/null and b/doc/design/dist/src/paddle-etcd.png differ diff --git a/doc/design/dist/src/paddle-model-sharding.graffle b/doc/design/dist/src/paddle-model-sharding.graffle new file mode 100644 index 0000000000000000000000000000000000000000..fba30f0ca2b47f0d202a432821d95e55aac37ec8 Binary files /dev/null and b/doc/design/dist/src/paddle-model-sharding.graffle differ diff --git a/doc/design/dist/src/paddle-model-sharding.png b/doc/design/dist/src/paddle-model-sharding.png new file mode 100644 index 0000000000000000000000000000000000000000..8c3f6724ef46c6527e63a4cd8cb0b50fe0167124 Binary files /dev/null and b/doc/design/dist/src/paddle-model-sharding.png differ diff --git a/doc/design/dist/src/paddle-ps-0.png b/doc/design/dist/src/paddle-ps-0.png new file mode 100644 index 0000000000000000000000000000000000000000..47ef32806f182cab003da77f1556823b3f6d1721 Binary files /dev/null and b/doc/design/dist/src/paddle-ps-0.png differ diff --git a/doc/design/dist/src/paddle-ps-1.png b/doc/design/dist/src/paddle-ps-1.png new file mode 100644 index 0000000000000000000000000000000000000000..f3125db73096c52bac6e7c60e1675552857c0774 Binary files /dev/null and b/doc/design/dist/src/paddle-ps-1.png differ diff --git a/doc/design/dist/src/paddle-ps.graffle b/doc/design/dist/src/paddle-ps.graffle new file mode 100644 index 0000000000000000000000000000000000000000..0e536ffdd91cd696008b4c01bad3cb53edebdc16 Binary files /dev/null and b/doc/design/dist/src/paddle-ps.graffle differ diff --git a/doc/design/dist/src/paddle-task-queues.graffle b/doc/design/dist/src/paddle-task-queues.graffle new file mode 100644 index 0000000000000000000000000000000000000000..4263ed8bfd2ef0e55058828bf23f2fac3595e5fd Binary files /dev/null and b/doc/design/dist/src/paddle-task-queues.graffle differ diff --git a/doc/design/dist/src/paddle-task-queues.png b/doc/design/dist/src/paddle-task-queues.png new file mode 100644 index 0000000000000000000000000000000000000000..5f980266795776752cebd0c346b85c4a75a47780 Binary files /dev/null and b/doc/design/dist/src/paddle-task-queues.png differ diff --git a/doc/design/dist/src/paddle-task-states.graffle b/doc/design/dist/src/paddle-task-states.graffle new file mode 100644 index 0000000000000000000000000000000000000000..cf1a0b9246d9386a949d2dbb8c32fe84f72eea83 Binary files /dev/null and b/doc/design/dist/src/paddle-task-states.graffle differ diff --git a/doc/design/dist/src/paddle-task-states.png b/doc/design/dist/src/paddle-task-states.png new file mode 100644 index 0000000000000000000000000000000000000000..4ae43cb66c071aee9eb90d875e2373b29af9c3e0 Binary files /dev/null and b/doc/design/dist/src/paddle-task-states.png differ