parallel_do.md.txt 4.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
# Design Doc: Parallel_Do in PaddlePaddle

In PaddlePaddle, we use parallel_do primitive to represent multithread data parallel processing.

## Design overview

The definition of a parallel_do op looks like the following

```c++
AddInput(kInputs, "Inputs needed to be split onto different devices").AsDuplicable();
AddInput(kParameters, "Parameters are duplicated over different devices")
    .AsDuplicable();
AddInput(kPlaces, "Devices used for parallel processing");
AddOutput(kOutputs, "Outputs needed to be merged from different devices").AsDuplicable();
AddOutput(kParallelScopes,
          "Scopes for all local variables in forward pass. One scope for each device");
AddAttr<framework::BlockDesc *>(kParallelBlock,
                                "List of operaters to be executed in parallel");
```

A vanilla implementation of parallel_do can be shown as the following (`|` means single thread and
`||||` means multiple threads)

```
In the forward pass
  |      Split input onto different devices
27
  |      Copy parameter onto different devices
28 29 30 31 32 33 34 35 36 37 38 39 40 41
  ||||   Compute forward pass in parallel
  |      Merge output from different devices

In the backward pass
  |      Split output@grad onto different devices
  ||||   Compute backward pass in parallel
  |      accumulate param@grad from different devices to the first device
  |      Merge input@grad from different devices
  |      Copy param@grad to the place of parallel_do_op
```

This implementation allows to write mixed device program like this

```python
42 43
W1 = fluid.tensor(size=[100,20], parameter=true)
W2 = fluid.tensor(size=[20,15], parameter=true)
44

45 46 47
data = layers.data()

gpu_places = layers.get_place(use_gpu=True)
48 49
# parallel processing on multiple GPUs
pd = ParallelDo(gpu_places)
50 51
with pd.do(input=data):
    prediction = softmax(fc(fc(data, W1), W2))
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
    write_output(prediction)
prediction = pd()
loss = cross_entropy(prediction, label)
```

And the programDesc are like the following

```
# start_program will be run by executor(CPUPlace), all w1, w2 will be allocated on CPU
start_program
{
  vars: w1, w2
  ops: init(w1), init(w2)
}

main_program
{
block0 {
70
  vars: data, places, w1, w2, w1_grad, w2_grad,
71 72 73 74 75
  ops: data, get_place, parallel_do(block1),
       parallel_do_grad(block2),
       sgd(w2, w2_grad),
       sgd(w1, w1_grad)
}
76
block1 { # the forward pass
77 78 79 80
  parent_block: 0
  vars: data, h1, h2, loss
  ops: fc, fc, softmax
}
81
block2 { # the backward pass
82
  parent_block: 1
83
  vars: data_grad, h1_grad, h2_grad, loss_gard, local_w1_grad, local_w2_grad
84 85 86 87 88 89 90
  ops: softmax_grad,
       fc_grad
       fc_grad
}
}
```

91
## Performance Imporvement
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163

There are serial places we can make this parallel_do faster.

### forward: split input onto different devices

If the input of the parallel_do is independent from any prior opeartors, we can avoid this step by 
prefetching the input onto different devices in a seperate background thread. And the python code
looks like this.
```python
pd = ParallelDo(gpu_places)
with pd.do():
    feature = get_data_from_prefetch_queue(gpu_places)
    prediction = my_net(feature)
    write_output(activation)
```

### forward: Copy parameter to onto different devices

We can avoid this step by making each device have a copy of the parameter. This requires:

1. `fluid.default_start_up_program()` to be run on all devices
1. In the backward, allreduce param@grad at different devices, this requires
    1. `backward.py` add `allreduce` operators at parallel_do_grad
    1. `allreduce` operators need to be called in async mode to achieve maximum throughput
1. apply gradients related op(i.e. cliping, normalization, decay, sgd) on different devices in parallel

By doing so, we also avoided "backward: accumulate param@grad from different devices to the first device".
And the ProgramDesc looks like the following

```
# w1, w2 will be allocated on all GPUs
start_program
{
block0 {
  parallel_do(block1)
}
block1 {
  parent_block: 0
  vars: w1, w2
  ops: init(w1), init(w2)
}
}

main_program
{
block0 {
  vars: data, places, w1, w2
  ops: data, get_place, parallel_do(block1),
       parallel_do_grad(block2),      # append_backward
       parallel_do(block3)            # append_optimization
       
}
block1 {
  parent_block: 0
  vars: data, h1, h2, loss
  ops: fc, fc, softmax
}
block2 {
  parent_block: 1
  vars: data_grad, h1_grad, h2_grad, loss_gard, w1_grad, w2_grad
  ops: softmax_grad,
       fc_grad, allreduce(places, scopes, w1_grad),
       fc_grad, allreduce(places, scopes, w2_grad)
}
block3 {
  parent_block: 0
  vars: lr
  ops: sgd(w2, w2_grad),
       sgd(w1, w1_grad)
}
}
```