From 51804e4d3c82b12b46a8723edad45a6c5e94c47a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=A3?= <2042519524@qq.com> Date: Mon, 29 Nov 2021 10:40:13 +0800 Subject: [PATCH] Complement the collective communication english docs (#37030) Co-authored-by: Chen Long <1300851984@qq.com> --- python/paddle/distributed/collective.py | 111 +++++++++++++++-- .../distributed/fleet/utils/recompute.py | 117 ++++++++++++++++-- python/paddle/distributed/utils.py | 63 ++++++++-- 3 files changed, 270 insertions(+), 21 deletions(-) diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 6f12e902ff..b9e5789581 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -353,6 +353,13 @@ def broadcast(tensor, src, group=None, use_calc_stream=True): """ Broadcast a tensor from the source to all others. + As shown below, 4 GPUs each start 4 processes and GPU0 owns data 0. Through broadcast operator, + the data 0 will be sent to all GPUs from GPU0. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/broadcast.png + :width: 800 + :alt: broadcast + :align: center Args: tensor (Tensor): The Tensor to send if current rank is the source, or the tensor to receive otherwise. Its data type @@ -368,6 +375,7 @@ def broadcast(tensor, src, group=None, use_calc_stream=True): Examples: .. code-block:: python + # required: distributed import numpy as np import paddle from paddle.distributed import init_parallel_env @@ -420,6 +428,14 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): """ Reduce a tensor over all ranks so that all get the result. + As shown below, 4 GPUs each start 4 processes and the data on each GPU is represnted + by the GPU number. The reduce operator is sum. Through all_reduce operator, + each GPU will have the sum of the data from all GPUs. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allreduce.png + :width: 800 + :alt: all_reduce + :align: center Args: tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type @@ -435,6 +451,7 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): Examples: .. code-block:: python + # required: distributed import numpy as np import paddle from paddle.distributed import ReduceOp @@ -499,7 +516,14 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True): def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): """ - Reduce a tensor to the destination from all others. + Reduce a tensor to the destination from all others. As shown below, 4 GPUs each start 4 processes and the data on each GPU is respresnted + by the GPU number. The destination of the reduce operator is GPU0 and the process is sum. Through reduce operator, + the GPU0 will owns the sum of all data from all GPUs. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/reduce.png + :width: 800 + :alt: reduce + :align: center Args: tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type @@ -516,6 +540,7 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): Examples: .. code-block:: python + # required: distributed import numpy as np import paddle from paddle.distributed import init_parallel_env @@ -593,7 +618,15 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True): def all_gather(tensor_list, tensor, group=None, use_calc_stream=True): """ - Gather tensors from all participators and all get the result. + Gather tensors from all participators and all get the result. As shown + below, 4 GPUs each start 4 processes and the data on each GPU is represnted + by the GPU number. Through the all_gather operator, each GPU will have data + from all GPUs. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allgather.png + :width: 800 + :alt: all_gather + :align: center Args: tensor_list (list): A list of output Tensors. Every element in the list must be a Tensor whose data type @@ -610,6 +643,7 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True): Examples: .. code-block:: python + # required: distributed import numpy as np import paddle from paddle.distributed import init_parallel_env @@ -670,7 +704,13 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True): def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True): """ - Scatter a tensor to all participators. + Scatter a tensor to all participators. As shown below, 4 GPUs each start 4 processes and the source of the scatter + is GPU0. Through scatter operator, the data in GPU0 will be sent to all GPUs averagely. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/scatter.png + :width: 800 + :alt: scatter + :align: center Args: tensor (Tensor): The output Tensor. Its data type @@ -688,12 +728,11 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True): Examples: .. code-block:: python + # required: distributed import numpy as np import paddle from paddle.distributed import init_parallel_env - # required: gpu - paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id) init_parallel_env() if paddle.distributed.ParallelEnv().local_rank == 0: @@ -1265,16 +1304,66 @@ def split(x, to N/2 and are mapped to all zeros after embedding. Finally, the results on the two devices are sum-reduced. + The Embedding put on single card is as shown below: + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_embedding_single.png + :width: 800 + :height: 350 + :alt: single_embedding + :align: center + + Parallel Embedding is shown as below: + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_embedding_split.png + :width: 800 + :alt: split_embedding + :align: center + Case 2: Row Parallel Linear The weight of the linear operation is a NxM matrix with N rows and M columns. With row parallel linear, the weight is split into num_partitions partitions, each of which is a matrix with N/num_partitions rows and M column. + The linear layer put on single card is shown as below, the input variable is represented by X, + the weight matrix is represented by W and the output vaiable is O. The linear layer on single card is + simple matrix multiplication operation, O = X * W. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_single.png + :width: 800 + :alt: single_linear + :align: center + + Row Parallel Linear is shown as below. As the name suggests, Row Parallel Linear splits the weight matrix W into + [[W_row1], [W_row2]] along the row. And accordingly the input is splitted along the column into [X_col1, X_col2] and multiply their + respective weight matrices. Finally apply AllReduce on the output from each card to get the final output. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_row.png + :width: 800 + :alt: split_row + :align: center + Case 3: Column Parallel Linear The weight of the linear operation is a NxM matrix with N rows and M columns. With column parallel linear, the weight is split into num_paratitions partitions, each of which is a matrix with N rows and M/num_partitions column. + The linear layer put on single card has been illustrated on case 2 and Column Parallel Linear + is shown as below. The Column Parallel Linear splits the weight matrix W into [W_col1, W_col2] along the column and + these splitted matrices respectively multiply the input. Finally apply AllGather on the output from each card to get the final output. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_col.png + :width: 800 + :alt: split_col + :align: center + + As observed, the column parallel linear and row parallel linear can be combined to skip one ALLGATHER communication + operator. Furthermore the Attention and MLP can be combined to imporve the performance as shown below. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_col_row.png + :width: 800 + :alt: split_col_row + :align: center + Args: x (Tensor): Input tensor. It's data type should be float16, float32, float64, int32 or int64. size (list|tuple): A list or tuple with two elements indicating the shape of the weight. @@ -1398,8 +1487,16 @@ def split(x, def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True): """ - Scatter tensors in in_tensor_list to all participators and gather the result tensors in out_tensor_list. - + Scatter tensors in in_tensor_list to all participators averagely and gather the result tensors in out_tensor_list. + As shown below, the in_tensor_list in GPU0 includes 0_0 and 0_1, and GPU1 includes 1_0 and 1_1. + Through alltoall operator, the 0_0 in GPU0 will be sent to GPU0 and 0_1 to GPU1, 1_0 in GPU1 sent to GPU0 and 1_1 to GPU1. + Finally the out_tensor_list in GPU0 includes 0_0 and 1_0, and GPU1 includes 0_1 and 1_1. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/alltoall.png + :width: 800 + :alt: alltoall + :align: center + Args: in_tensor_list (list): A list of input Tensors. Every element in the list must be a Tensor whose data type should be float16, float32, float64, int32 or int64. diff --git a/python/paddle/distributed/fleet/utils/recompute.py b/python/paddle/distributed/fleet/utils/recompute.py index 2d1db5db94..29c541cfb1 100755 --- a/python/paddle/distributed/fleet/utils/recompute.py +++ b/python/paddle/distributed/fleet/utils/recompute.py @@ -183,15 +183,118 @@ def recompute(function, *args, **kwargs): """ recompute intermediate activations to save then memory. - Args: - function: layer of sequence of layers that describes part of forward pass of the model whose - intermediate activations will be released to save memory in forward stage and will be recomputed - in backward stage for gradient calculation. - preserve_rng_state(bool, optional): if preserve the RNG state of forward and restore it in backward. - args: inputs to the function + Parameters: + function(paddle.nn.Sequential): layer of sequence of layers that describes part of forward pass of the model + whose intermediate activations will be released to save memory in forward stage and will be recomputed + in backward stage for gradient calculation. + *args(Tensor): inputs to the function. + **kwargs(Dict): Kwargs should only contain the key-value pair of preserve_rng_state, which is used to + indicate whether to save the forward rng. If it is True, then the last forward rng value will be + restored when the forward recalculation of backpropagation is performed. The default + preserve_rng_state is True. Returns: - Output of function on args + Output of function on args. + + Examples: + .. code-block:: python + + import numpy as np + import paddle + from paddle.distributed.fleet.utils import recompute + import random + + # required: gpu + + def get_fc_block(block_idx, input_size, is_last=False): + block_name = "block_" + str(block_idx) + block = paddle.nn.Sequential( + (block_name + "_fc_0", paddle.nn.Linear(input_size, input_size, bias_attr=False)), + (block_name + "_dropout", paddle.nn.Dropout(p=0.5)), + (block_name + "_relu_1", paddle.nn.ReLU()), + (block_name + "_fc_1", paddle.nn.Linear(input_size, input_size, bias_attr=False)), + (block_name + "_relu_2", paddle.nn.ReLU()), + ) + if is_last: + block.add_sublayer( + block_name + "_fc_2", + paddle.nn.Linear( + input_size, 1, bias_attr=False + ) + ) + else: + block.add_sublayer( + block_name + "_fc_2", + paddle.nn.Linear(input_size, input_size, bias_attr=False) + ) + + return block + + + class Naive_fc_net(paddle.nn.Layer): + def __init__(self, input_size=10, + recompute_blocks=[1, 3], + recompute_kwargs={}): + super(Naive_fc_net, self).__init__() + self.recompute_blocks = recompute_blocks + self.recompute_kwargs = recompute_kwargs + self.runfunc0 = get_fc_block(0, input_size, is_last=False) + self.runfunc1 = get_fc_block(1, input_size, is_last=False) + self.runfunc2 = get_fc_block(2, input_size, is_last=False) + self.runfunc3 = get_fc_block(3, input_size, is_last=False) + self.runfunc4 = get_fc_block(4, input_size, is_last=True) + self.total_func = [self.runfunc0, self.runfunc1, self.runfunc2, self.runfunc3, self.runfunc4] + + def forward(self, inputs): + nums = len(self.total_func) + for i in range(nums): + if i in self.recompute_blocks: + inputs = recompute(self.total_func[i], inputs, **{"preserve_rng_state": True}) + else: + inputs = self.total_func[i](inputs) + return inputs + + def run_model(cuda_state, recompute_block=[], recompute_kwargs={}): + gen = paddle.seed(10) + gen.manual_seed(10) + np.random.seed(10) + random.seed(10) + if cuda_state: + paddle.set_cuda_rng_state(cuda_state) + + batch_size, input_size = 1, 10 + model = Naive_fc_net( + input_size, + recompute_blocks=recompute_block, + recompute_kwargs=recompute_kwargs) + optimizer = paddle.optimizer.SGD(learning_rate=0.01, parameters=model.parameters()) + loss_ = [] + param_ = [] + grad_ = [] + for _ in range(5): + x_data = np.random.randn(batch_size, input_size).astype(np.float32) + x = paddle.to_tensor(x_data) + y_pred = model(x) + loss = y_pred.mean() + loss_.append(np.asarray(loss).tolist()) + loss.backward() + optimizer.step() + param_.append(np.asarray(model.parameters()[9]).tolist()) + grad_.append(np.asarray(model.parameters()[3]._grad_ivar()).tolist()) + optimizer.clear_grad() + + return loss_, param_, grad_ + + cuda_state = paddle.get_cuda_rng_state() + # without recompute + loss_ref, param_ref, grad_ref = run_model( + cuda_state, recompute_block=[] + ) + + loss, param, grad = run_model(cuda_state, recompute_block=[1, 2]) + print("normal_loss: {}, recompute_loss: {}".format(loss_ref, loss)) + # The result of the recompute_loss should be the same as the normal_loss. + """ # Hack to mix *args with **kwargs in a python 2.7-compliant way preserve = kwargs.pop('preserve_rng_state', True) diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils.py index 1c27a0018f..8fa06adba2 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils.py @@ -60,11 +60,39 @@ def global_scatter(x, group=None, use_calc_stream=True): """ - Scatter data in x which has been put together belong to one expert - to n_expert * world_size exeperts according to local_count and receive tensors - from n_expert * world_size experts according - to global_count. + The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count, + and then receives data according to global_count. The expert refers to a user-defined expert network, + n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. + As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. + The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). + In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, + global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the + figure respresent the rank of the current card in all cards. + + The process of global_scatter sending data is as follows: + + local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card; + + local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card; + + local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card; + + local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card; + + Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; + + the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert; + + the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; + + the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png + :width: 800 + :alt: global_scatter_gather + :align: center + Args: x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64. local_count (Tensor): Tensor which have n_expert * world_size elements that indicates @@ -154,9 +182,30 @@ def global_gather(x, group=None, use_calc_stream=True): """ - Gather data in x to n_expert * world_size exeperts according to - local_count and receive tensors from n_expert * world_size experts according - to global_count. + The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count. + The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. + + As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. + The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). + In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, + local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card. + The rank in the figure respresent the rank of the current card in all cards. + + The process of global_gather sending data is as follows: + + The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card; + + The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card; + + The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card; + + The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png + :width: 800 + :alt: global_scatter_gather + :align: center + Args: x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64. -- GitLab