cluster_train_en.md 9.2 KB
Newer Older
T
typhoonzero 已提交
1
# Distributed Training
武毅 已提交
2

3
## Introduction
武毅 已提交
4 5 6 7 8 9 10 11 12 13 14 15 16

In this article, we'll explain how to run distributed training jobs with PaddlePaddle on different types of clusters. The diagram below shows the main architecture of a distributed trainning job:

<img src="https://user-images.githubusercontent.com/13348433/31772146-41523d84-b511-11e7-8a12-a69fd136c283.png" width="500">

- Data shard: training data will be split into multiple partitions, trainers use the partitions of the whole dataset to do the training job.
- Trainer: each trainer reads the data shard, and train the neural network. Then the trainer will upload calculated "gradients" to parameter servers, and wait for parameters to be optimized on the parameter server side. When that finishes, the trainer download optimized parameters and continues its training.
- Parameter server: every parameter server stores part of the whole neural network model data. They will do optimization calculations when gradients are uploaded from trainers, and then send updated parameters to trainers.

PaddlePaddle can support both synchronize stochastic gradient descent (SGD) and asynchronous SGD.

When training with synchronize SGD, PaddlePaddle uses an internal "synchronize barrier" which makes gradients update and parameter download in strict order. On the other hand, asynchronous SGD won't wait for all trainers to finish upload at a single step, this will increase the parallelism of distributed training: parameter servers do not depend on each other, they'll do parameter optimization concurrently. Parameter servers will not wait for trainers, so trainers will also do their work concurrently. But asynchronous SGD will introduce more randomness and noises in the gradient.

17
## Preparations
武毅 已提交
18
1. Prepare your computer cluster. It's normally a bunch of Linux servers connected by LAN. Each server will be assigned a unique IP address. The computers in the cluster can be called "nodes".
T
typhoonzero 已提交
19
2. Install PaddlePaddle on every node. If you are going to take advantage of GPU cards, you'll also need to install proper driver and CUDA libraries. To install PaddlePaddle please read [this build and install](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/index_en.html) document. We strongly recommend using [Docker installation](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/docker_install_en.html).
武毅 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32

After installation, you can check the version by typing the below command (run a docker container  if using docker: `docker run -it paddlepaddle/paddle:[tag] /bin/bash`):

```bash
$ paddle version
PaddlePaddle 0.10.0rc, compiled with
    with_avx: ON
    with_gpu: OFF
    with_double: OFF
    with_python: ON
    with_rdma: OFF
    with_timer: OFF
```
Z
zhangjinchao01 已提交
33

武毅 已提交
34
We'll take `doc/howto/usage/cluster/src/word2vec` as an example to introduce distributed training using PaddlePaddle v2 API.
Z
zhangjinchao01 已提交
35

36
## Command-line arguments
Z
zhangjinchao01 已提交
37

38
### Starting parameter server
Z
zhangjinchao01 已提交
39

武毅 已提交
40
Type the below command to start a parameter server which will wait for trainers to connect:
Z
zhangjinchao01 已提交
41

武毅 已提交
42 43 44
```bash
$ paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradient_servers=1
```
Z
zhangjinchao01 已提交
45

武毅 已提交
46 47 48 49
If you wish to run parameter servers in background, and save a log file, you can type:
```bash
$ stdbuf -oL /usr/bin/nohup paddle pserver --port=7164 --ports_num=1 --ports_num_for_sparse=1 --num_gradient_servers=1 &> pserver.log
```
Z
zhangjinchao01 已提交
50

T
typhoonzero 已提交
51 52 53 54
Parameter Description

- port: **required, default 7164**, port which parameter server will listen on. If ports_num greater than 1, parameter server will listen on multiple ports for more network throughput.
- ports_num: **required, default 1**, total number of ports will listen on.
55
- ports_num_for_sparse: **required, default 0**, number of ports which serves sparse parameter update.
T
typhoonzero 已提交
56
- num_gradient_servers: **required, default 1**, total number of gradient servers.
Z
zhangjinchao01 已提交
57

58
### Starting trainer
武毅 已提交
59
Type the command below to start the trainer(name the file whatever you want, like "train.py")
Z
zhangjinchao01 已提交
60

武毅 已提交
61 62 63
```bash
$ python train.py
```
Z
zhangjinchao01 已提交
64

武毅 已提交
65
Trainers' network need to be connected with parameter servers' network to finish the job. Trainers need to know port and IPs to locate parameter servers. You can pass arguments to trainers through [environment variables](https://en.wikipedia.org/wiki/Environment_variable) or pass to `paddle.init()` function. Arguments passed to the `paddle.init()` function will overwrite environment variables.
Z
zhangjinchao01 已提交
66

武毅 已提交
67
Use environment viriables:
Z
zhangjinchao01 已提交
68

武毅 已提交
69 70 71 72 73 74 75 76 77 78 79
```bash
export PADDLE_INIT_USE_GPU=False
export PADDLE_INIT_TRAINER_COUNT=1
export PADDLE_INIT_PORT=7164
export PADDLE_INIT_PORTS_NUM=1
export PADDLE_INIT_PORTS_NUM_FOR_SPARSE=1
export PADDLE_INIT_NUM_GRADIENT_SERVERS=1
export PADDLE_INIT_TRAINER_ID=0
export PADDLE_INIT_PSERVERS=127.0.0.1
python train.py
```
Z
zhangjinchao01 已提交
80

武毅 已提交
81
Pass arguments:
Z
zhangjinchao01 已提交
82

武毅 已提交
83 84 85 86 87 88 89 90 91 92
```python
paddle.init(
        use_gpu=False,
        trainer_count=1,
        port=7164,
        ports_num=1,
        ports_num_for_sparse=1,
        num_gradient_servers=1,
        trainer_id=0,
        pservers="127.0.0.1")
Z
zhangjinchao01 已提交
93
```
武毅 已提交
94

T
typhoonzero 已提交
95 96 97 98 99 100
Parameter Description

- use_gpu: **optional, default False**, set to "True" to enable GPU training.
- trainer_count: **required, default 1**, total count of trainers in the training job.
- port: **required, default 7164**, port to connect to parameter server.
- ports_num: **required, default 1**, number of ports for communication.
101
- ports_num_for_sparse: **required, default 0**, number of ports for sparse type caculation.
T
typhoonzero 已提交
102 103 104
- num_gradient_servers: **required, default 1**, total number of gradient server.
- trainer_id: **required, default 0**, ID for every trainer, start from 0.
- pservers: **required, default 127.0.0.1**, list of IPs of parameter servers, separated by ",".
武毅 已提交
105

106
### Prepare Training Dataset
武毅 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132

Here's some example code [prepare.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/prepare.py), it will download public `imikolov` dataset and split it into multiple files according to job parallelism(trainers count). Modify `SPLIT_COUNT` at the begining of `prepare.py` to change the count of output files.

In the real world, we often use `MapReduce` job's output as training data, so there will be lots of files. You can use `mod` to assign training file to trainers:

```python
import os
train_list = []
flist = os.listdir("/train_data/")
for f in flist:
  suffix = int(f.split("-")[1])
  if suffix % TRAINER_COUNT == TRAINER_ID:
    train_list.append(f)
```

Example code `prepare.py` will split training data and testing data into 3 files with digital suffix like `-00000`, `-00001` and`-00002`:

```
train.txt
train.txt-00000
train.txt-00001
train.txt-00002
test.txt
test.txt-00000
test.txt-00001
test.txt-00002
Z
zhangjinchao01 已提交
133 134
```

武毅 已提交
135
When job started, every trainer needs to get it's own part of data. In some distributed systems a storage service will be provided, so the date under that path can be accessed by all the trainer nodes. Without the storage service, you must copy the training data to each trainer node.
Z
zhangjinchao01 已提交
136

武毅 已提交
137
Different training jobs may have different data format and `reader()` function, developers may need to write different data prepare scripts and `reader()` functions for their job.
Z
zhangjinchao01 已提交
138

139
### Prepare Training program
140

武毅 已提交
141
We'll create a *workspace* directory on each node, storing your training program, dependencies, mounted or downloaded dataset directory.
Z
zhangjinchao01 已提交
142 143


武毅 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
Your workspace may looks like:
```
.
|-- my_lib.py
|-- word_dict.pickle
|-- train.py
|-- train_data_dir/
|   |-- train.txt-00000
|   |-- train.txt-00001
|   |-- train.txt-00002
`-- test_data_dir/
    |-- test.txt-00000
    |-- test.txt-00001
    `-- test.txt-00002
```
Z
zhangjinchao01 已提交
159

武毅 已提交
160 161
- `my_lib.py`: user defined libraries, like PIL libs. This is optional.
- `word_dict.pickle`: dict file for training word embeding.
T
typhoonzero 已提交
162
- `train.py`: training program. Sample code: [api_train_v2_cluster.py](https://github.com/PaddlePaddle/Paddle/tree/develop/doc/howto/usage/cluster/src/word2vec/api_train_v2_cluster.py). ***NOTE:*** You may need to modify the head part of `train.py` when using different cluster platform to retrive configuration environment variables:
Z
zhangjinchao01 已提交
163

武毅 已提交
164 165 166 167 168 169 170
  ```python
  cluster_train_file = "./train_data_dir/train/train.txt"
  cluster_test_file = "./test_data_dir/test/test.txt"
  node_id = os.getenv("OMPI_COMM_WORLD_RANK")
  if not node_id:
      raise EnvironmentError("must provied OMPI_COMM_WORLD_RANK")
  ```
Z
zhangjinchao01 已提交
171

武毅 已提交
172 173
- `train_data_dir`: containing training data. Mount from storage service or copy trainning data to here.
- `test_data_dir`: containing testing data.
Z
zhangjinchao01 已提交
174

175
## Use cluster platforms or cluster management tools
Z
zhangjinchao01 已提交
176

武毅 已提交
177 178 179 180
PaddlePaddle supports running jobs on several platforms including:
- [Kubernetes](http://kubernetes.io) open-source system for automating deployment, scaling, and management of containerized applications from Google.
- [OpenMPI](https://www.open-mpi.org) Mature high performance parallel computing framework.
- [Fabric](http://www.fabfile.org) A cluster management tool. Write scripts to submit jobs or manage the cluster.
Z
zhangjinchao01 已提交
181

武毅 已提交
182
We'll introduce cluster job management on these platforms. The examples can be found under [cluster_train_v2](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/scripts/cluster_train_v2).
Z
zhangjinchao01 已提交
183

武毅 已提交
184
These cluster platforms provide API or environment variables for training processes, when the job is dispatched to different nodes. Like node ID, IP or total number of nodes etc.
Z
zhangjinchao01 已提交
185

T
typhoonzero 已提交
186
## Use different clusters
武毅 已提交
187

T
typhoonzero 已提交
188
  - [fabric](fabric_en.md)
T
typhoonzero 已提交
189
  - [openmpi](openmpi_en.md)
T
typhoonzero 已提交
190 191
  - [kubernetes](k8s_en.md)
  - [kubernetes on AWS](k8s_aws_en.md)