parallel_do.md 4.5 KB
Newer Older
Y
Yang Yang(Tony) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Design Doc: Parallel_Do in PaddlePaddle

In PaddlePaddle, we use parallel_do primitive to represent multithread data parallel processing.

## Design overview

The definition of a parallel_do op looks like the following

```c++
AddInput(kInputs, "Inputs needed to be split onto different devices").AsDuplicable();
AddInput(kParameters, "Parameters are duplicated over different devices")
    .AsDuplicable();
AddInput(kPlaces, "Devices used for parallel processing");
AddOutput(kOutputs, "Outputs needed to be merged from different devices").AsDuplicable();
AddOutput(kParallelScopes,
Y
Yang Yang(Tony) 已提交
16
          "Scopes for all local variables in forward pass. One scope for each device");
Y
Yang Yang(Tony) 已提交
17 18 19 20 21 22 23 24 25 26
AddAttr<framework::BlockDesc *>(kParallelBlock,
                                "List of operaters to be executed in parallel");
```

A vanilla implementation of parallel_do can be shown as the following (`|` means single thread and
`||||` means multiple threads)

```
In the forward pass
  |      Split input onto different devices
Y
Yang Yang(Tony) 已提交
27
  |      Copy parameter onto different devices
Y
Yang Yang(Tony) 已提交
28 29 30 31 32 33 34 35
  ||||   Compute forward pass in parallel
  |      Merge output from different devices

In the backward pass
  |      Split output@grad onto different devices
  ||||   Compute backward pass in parallel
  |      accumulate param@grad from different devices to the first device
  |      Merge input@grad from different devices
Y
Yang Yang(Tony) 已提交
36
  |      Copy param@grad to the place of parallel_do_op
Y
Yang Yang(Tony) 已提交
37 38 39 40 41
```

This implementation allows to write mixed device program like this

```python
Y
Yang Yang(Tony) 已提交
42 43
W1 = fluid.tensor(size=[100,20], parameter=true)
W2 = fluid.tensor(size=[20,15], parameter=true)
Y
Yang Yang(Tony) 已提交
44

Y
Yang Yang(Tony) 已提交
45 46 47
data = layers.data()

gpu_places = layers.get_place(use_gpu=True)
Y
Yang Yang(Tony) 已提交
48 49
# parallel processing on multiple GPUs
pd = ParallelDo(gpu_places)
Y
Yang Yang(Tony) 已提交
50 51
with pd.do(input=data):
    prediction = softmax(fc(fc(data, W1), W2))
Y
Yang Yang(Tony) 已提交
52
    write_output(prediction)
Y
Yang Yang(Tony) 已提交
53 54 55 56
prediction = pd()
loss = cross_entropy(prediction, label)
```

Y
Yang Yang(Tony) 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69
And the programDesc are like the following

```
# start_program will be run by executor(CPUPlace), all w1, w2 will be allocated on CPU
start_program
{
  vars: w1, w2
  ops: init(w1), init(w2)
}

main_program
{
block0 {
Y
Yang Yang(Tony) 已提交
70
  vars: data, places, w1, w2, w1_grad, w2_grad,
Y
Yang Yang(Tony) 已提交
71 72 73 74 75
  ops: data, get_place, parallel_do(block1),
       parallel_do_grad(block2),
       sgd(w2, w2_grad),
       sgd(w1, w1_grad)
}
Y
Yang Yang(Tony) 已提交
76
block1 { # the forward pass
Y
Yang Yang(Tony) 已提交
77
  parent_block: 0
Y
Yang Yang(Tony) 已提交
78 79 80
  vars: data, h1, h2, loss
  ops: fc, fc, softmax
}
Y
Yang Yang(Tony) 已提交
81
block2 { # the backward pass
Y
Yang Yang(Tony) 已提交
82
  parent_block: 1
Y
Yang Yang(Tony) 已提交
83
  vars: data_grad, h1_grad, h2_grad, loss_gard, local_w1_grad, local_w2_grad
Y
Yang Yang(Tony) 已提交
84 85 86 87 88 89 90
  ops: softmax_grad,
       fc_grad
       fc_grad
}
}
```

Y
Yang Yang(Tony) 已提交
91
## Performance Imporvement
Y
Yang Yang(Tony) 已提交
92 93 94 95 96 97 98 99 100 101 102

There are serial places we can make this parallel_do faster.

### forward: split input onto different devices

If the input of the parallel_do is independent from any prior opeartors, we can avoid this step by 
prefetching the input onto different devices in a seperate background thread. And the python code
looks like this.
```python
pd = ParallelDo(gpu_places)
with pd.do():
Y
Yang Yang(Tony) 已提交
103
    feature = get_data_from_prefetch_queue(gpu_places)
Y
Yang Yang(Tony) 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117
    prediction = my_net(feature)
    write_output(activation)
```

### forward: Copy parameter to onto different devices

We can avoid this step by making each device have a copy of the parameter. This requires:

1. `fluid.default_start_up_program()` to be run on all devices
1. In the backward, allreduce param@grad at different devices, this requires
    1. `backward.py` add `allreduce` operators at parallel_do_grad
    1. `allreduce` operators need to be called in async mode to achieve maximum throughput
1. apply gradients related op(i.e. cliping, normalization, decay, sgd) on different devices in parallel

Y
Yang Yang(Tony) 已提交
118 119 120 121 122 123 124 125 126 127 128
By doing so, we also avoided "backward: accumulate param@grad from different devices to the first device".
And the ProgramDesc looks like the following

```
# w1, w2 will be allocated on all GPUs
start_program
{
block0 {
  parallel_do(block1)
}
block1 {
Y
Yang Yang(Tony) 已提交
129
  parent_block: 0
Y
Yang Yang(Tony) 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
  vars: w1, w2
  ops: init(w1), init(w2)
}
}

main_program
{
block0 {
  vars: data, places, w1, w2
  ops: data, get_place, parallel_do(block1),
       parallel_do_grad(block2),      # append_backward
       parallel_do(block3)            # append_optimization
       
}
block1 {
Y
Yang Yang(Tony) 已提交
145
  parent_block: 0
Y
Yang Yang(Tony) 已提交
146 147 148 149
  vars: data, h1, h2, loss
  ops: fc, fc, softmax
}
block2 {
Y
Yang Yang(Tony) 已提交
150
  parent_block: 1
Y
Yang Yang(Tony) 已提交
151 152 153 154 155 156
  vars: data_grad, h1_grad, h2_grad, loss_gard, w1_grad, w2_grad
  ops: softmax_grad,
       fc_grad, allreduce(places, scopes, w1_grad),
       fc_grad, allreduce(places, scopes, w2_grad)
}
block3 {
Y
Yang Yang(Tony) 已提交
157
  parent_block: 0
Y
Yang Yang(Tony) 已提交
158 159 160 161 162 163
  vars: lr
  ops: sgd(w2, w2_grad),
       sgd(w1, w1_grad)
}
}
```