diff --git a/doc/design/parallel_do.md b/doc/design/parallel_do.md new file mode 100644 index 0000000000000000000000000000000000000000..221af6b6a48c325902ba7e5aeb7f67b1154c15c4 --- /dev/null +++ b/doc/design/parallel_do.md @@ -0,0 +1,162 @@ +# Design Doc: Parallel_Do in PaddlePaddle + +In PaddlePaddle, we use parallel_do primitive to represent multithread data parallel processing. + +## Design overview + +The definition of a parallel_do op looks like the following + +```c++ +AddInput(kInputs, "Inputs needed to be split onto different devices").AsDuplicable(); +AddInput(kParameters, "Parameters are duplicated over different devices") + .AsDuplicable(); +AddInput(kPlaces, "Devices used for parallel processing"); +AddOutput(kOutputs, "Outputs needed to be merged from different devices").AsDuplicable(); +AddOutput(kParallelScopes, + "Scopes for all local variables in forward pass. One scope for each device"); +AddAttr(kParallelBlock, + "List of operaters to be executed in parallel"); +``` + +A vanilla implementation of parallel_do can be shown as the following (`|` means single thread and +`||||` means multiple threads) + +``` +In the forward pass + | Split input onto different devices + | Copy parameter to onto different devices + |||| Compute forward pass in parallel + | Merge output from different devices + +In the backward pass + | Split output@grad onto different devices + |||| Compute backward pass in parallel + | accumulate param@grad from different devices to the first device + | Merge input@grad from different devices +  | Copy param@grad to the place of parallel_do_op +``` + +This implementation allows to write mixed device program like this + +```python +# get embedding feature on CPU +feature = some_cpu_only_op(data) + +gpu_places = get_place(use_gpu=True) +# parallel processing on multiple GPUs +pd = ParallelDo(gpu_places) +with pd.do(): + read_input(feature) + prediction = my_net(feature) + write_output(prediction) +prediction = pd() +loss = cross_entropy(prediction, label) +``` + +And the programDesc are like the following + +``` +# start_program will be run by executor(CPUPlace), all w1, w2 will be allocated on CPU +start_program +{ + vars: w1, w2 + ops: init(w1), init(w2) +} + +main_program +{ +block0 { + vars: data, places, w1, w2 + ops: data, get_place, parallel_do(block1), + parallel_do_grad(block2), + sgd(w2, w2_grad), + sgd(w1, w1_grad) +} +block1 { + parent_block: 0 + vars: data, h1, h2, loss + ops: fc, fc, softmax +} +block2 { + parent_block: 1 + vars: data_grad, h1_grad, h2_grad, loss_gard, w1_grad, w2_grad + ops: softmax_grad, + fc_grad + fc_grad +} +} +``` + +## Proformance Imporvement + +There are serial places we can make this parallel_do faster. + +### forward: split input onto different devices + +If the input of the parallel_do is independent from any prior opeartors, we can avoid this step by +prefetching the input onto different devices in a seperate background thread. And the python code +looks like this. +```python +pd = ParallelDo(gpu_places) +with pd.do(): +    feature = get_data_from_prefetch_queue(gpu_places) + prediction = my_net(feature) + write_output(activation) +``` + +### forward: Copy parameter to onto different devices + +We can avoid this step by making each device have a copy of the parameter. This requires: + +1. `fluid.default_start_up_program()` to be run on all devices +1. In the backward, allreduce param@grad at different devices, this requires + 1. `backward.py` add `allreduce` operators at parallel_do_grad + 1. `allreduce` operators need to be called in async mode to achieve maximum throughput +1. apply gradients related op(i.e. cliping, normalization, decay, sgd) on different devices in parallel + +By doing so, we also avoided "backward: accumulate param@grad from different devices to the first device". +And the ProgramDesc looks like the following + +``` +# w1, w2 will be allocated on all GPUs +start_program +{ +block0 { + parallel_do(block1) +} +block1 { + parent_block: 0 + vars: w1, w2 + ops: init(w1), init(w2) +} +} + +main_program +{ +block0 { + vars: data, places, w1, w2 + ops: data, get_place, parallel_do(block1), + parallel_do_grad(block2), # append_backward + parallel_do(block3) # append_optimization + +} +block1 { + parent_block: 0 + vars: data, h1, h2, loss + ops: fc, fc, softmax +} +block2 { + parent_block: 1 + vars: data_grad, h1_grad, h2_grad, loss_gard, w1_grad, w2_grad + ops: softmax_grad, + fc_grad, allreduce(places, scopes, w1_grad), + fc_grad, allreduce(places, scopes, w2_grad) +} +block3 { + parent_block: 0 + vars: lr + ops: sgd(w2, w2_grad), + sgd(w1, w1_grad) +} +} +```