未验证 提交 51804e4d 编写于 作者: 李季 提交者: GitHub

Complement the collective communication english docs (#37030)

Co-authored-by: NChen Long <1300851984@qq.com>
上级 ae544242
......@@ -353,6 +353,13 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
"""
Broadcast a tensor from the source to all others.
As shown below, 4 GPUs each start 4 processes and GPU0 owns data 0. Through broadcast operator,
the data 0 will be sent to all GPUs from GPU0.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/broadcast.png
:width: 800
:alt: broadcast
:align: center
Args:
tensor (Tensor): The Tensor to send if current rank is the source, or the tensor to receive otherwise. Its data type
......@@ -368,6 +375,7 @@ def broadcast(tensor, src, group=None, use_calc_stream=True):
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
......@@ -420,6 +428,14 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
"""
Reduce a tensor over all ranks so that all get the result.
As shown below, 4 GPUs each start 4 processes and the data on each GPU is represnted
by the GPU number. The reduce operator is sum. Through all_reduce operator,
each GPU will have the sum of the data from all GPUs.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allreduce.png
:width: 800
:alt: all_reduce
:align: center
Args:
tensor (Tensor): The input Tensor. It also works as the output Tensor. Its data type
......@@ -435,6 +451,7 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import ReduceOp
......@@ -499,7 +516,14 @@ def all_reduce(tensor, op=ReduceOp.SUM, group=None, use_calc_stream=True):
def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
"""
Reduce a tensor to the destination from all others.
Reduce a tensor to the destination from all others. As shown below, 4 GPUs each start 4 processes and the data on each GPU is respresnted
by the GPU number. The destination of the reduce operator is GPU0 and the process is sum. Through reduce operator,
the GPU0 will owns the sum of all data from all GPUs.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/reduce.png
:width: 800
:alt: reduce
:align: center
Args:
tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type
......@@ -516,6 +540,7 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
......@@ -593,7 +618,15 @@ def reduce(tensor, dst, op=ReduceOp.SUM, group=None, use_calc_stream=True):
def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
"""
Gather tensors from all participators and all get the result.
Gather tensors from all participators and all get the result. As shown
below, 4 GPUs each start 4 processes and the data on each GPU is represnted
by the GPU number. Through the all_gather operator, each GPU will have data
from all GPUs.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/allgather.png
:width: 800
:alt: all_gather
:align: center
Args:
tensor_list (list): A list of output Tensors. Every element in the list must be a Tensor whose data type
......@@ -610,6 +643,7 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
......@@ -670,7 +704,13 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True):
def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
"""
Scatter a tensor to all participators.
Scatter a tensor to all participators. As shown below, 4 GPUs each start 4 processes and the source of the scatter
is GPU0. Through scatter operator, the data in GPU0 will be sent to all GPUs averagely.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/scatter.png
:width: 800
:alt: scatter
:align: center
Args:
tensor (Tensor): The output Tensor. Its data type
......@@ -688,12 +728,11 @@ def scatter(tensor, tensor_list=None, src=0, group=None, use_calc_stream=True):
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
# required: gpu
paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
init_parallel_env()
if paddle.distributed.ParallelEnv().local_rank == 0:
......@@ -1265,16 +1304,66 @@ def split(x,
to N/2 and are mapped to all zeros after embedding. Finally, the results on the two
devices are sum-reduced.
The Embedding put on single card is as shown below:
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_embedding_single.png
:width: 800
:height: 350
:alt: single_embedding
:align: center
Parallel Embedding is shown as below:
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_embedding_split.png
:width: 800
:alt: split_embedding
:align: center
Case 2: Row Parallel Linear
The weight of the linear operation is a NxM matrix with N rows and M columns.
With row parallel linear, the weight is split into num_partitions partitions, each
of which is a matrix with N/num_partitions rows and M column.
The linear layer put on single card is shown as below, the input variable is represented by X,
the weight matrix is represented by W and the output vaiable is O. The linear layer on single card is
simple matrix multiplication operation, O = X * W.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_single.png
:width: 800
:alt: single_linear
:align: center
Row Parallel Linear is shown as below. As the name suggests, Row Parallel Linear splits the weight matrix W into
[[W_row1], [W_row2]] along the row. And accordingly the input is splitted along the column into [X_col1, X_col2] and multiply their
respective weight matrices. Finally apply AllReduce on the output from each card to get the final output.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_row.png
:width: 800
:alt: split_row
:align: center
Case 3: Column Parallel Linear
The weight of the linear operation is a NxM matrix with N rows and M columns.
With column parallel linear, the weight is split into num_paratitions partitions, each
of which is a matrix with N rows and M/num_partitions column.
The linear layer put on single card has been illustrated on case 2 and Column Parallel Linear
is shown as below. The Column Parallel Linear splits the weight matrix W into [W_col1, W_col2] along the column and
these splitted matrices respectively multiply the input. Finally apply AllGather on the output from each card to get the final output.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_col.png
:width: 800
:alt: split_col
:align: center
As observed, the column parallel linear and row parallel linear can be combined to skip one ALLGATHER communication
operator. Furthermore the Attention and MLP can be combined to imporve the performance as shown below.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_col_row.png
:width: 800
:alt: split_col_row
:align: center
Args:
x (Tensor): Input tensor. It's data type should be float16, float32, float64, int32 or int64.
size (list|tuple): A list or tuple with two elements indicating the shape of the weight.
......@@ -1398,8 +1487,16 @@ def split(x,
def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True):
"""
Scatter tensors in in_tensor_list to all participators and gather the result tensors in out_tensor_list.
Scatter tensors in in_tensor_list to all participators averagely and gather the result tensors in out_tensor_list.
As shown below, the in_tensor_list in GPU0 includes 0_0 and 0_1, and GPU1 includes 1_0 and 1_1.
Through alltoall operator, the 0_0 in GPU0 will be sent to GPU0 and 0_1 to GPU1, 1_0 in GPU1 sent to GPU0 and 1_1 to GPU1.
Finally the out_tensor_list in GPU0 includes 0_0 and 1_0, and GPU1 includes 0_1 and 1_1.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/alltoall.png
:width: 800
:alt: alltoall
:align: center
Args:
in_tensor_list (list): A list of input Tensors. Every element in the list must be a Tensor whose data type
should be float16, float32, float64, int32 or int64.
......
......@@ -183,15 +183,118 @@ def recompute(function, *args, **kwargs):
"""
recompute intermediate activations to save then memory.
Args:
function: layer of sequence of layers that describes part of forward pass of the model whose
intermediate activations will be released to save memory in forward stage and will be recomputed
in backward stage for gradient calculation.
preserve_rng_state(bool, optional): if preserve the RNG state of forward and restore it in backward.
args: inputs to the function
Parameters:
function(paddle.nn.Sequential): layer of sequence of layers that describes part of forward pass of the model
whose intermediate activations will be released to save memory in forward stage and will be recomputed
in backward stage for gradient calculation.
*args(Tensor): inputs to the function.
**kwargs(Dict): Kwargs should only contain the key-value pair of preserve_rng_state, which is used to
indicate whether to save the forward rng. If it is True, then the last forward rng value will be
restored when the forward recalculation of backpropagation is performed. The default
preserve_rng_state is True.
Returns:
Output of function on args
Output of function on args.
Examples:
.. code-block:: python
import numpy as np
import paddle
from paddle.distributed.fleet.utils import recompute
import random
# required: gpu
def get_fc_block(block_idx, input_size, is_last=False):
block_name = "block_" + str(block_idx)
block = paddle.nn.Sequential(
(block_name + "_fc_0", paddle.nn.Linear(input_size, input_size, bias_attr=False)),
(block_name + "_dropout", paddle.nn.Dropout(p=0.5)),
(block_name + "_relu_1", paddle.nn.ReLU()),
(block_name + "_fc_1", paddle.nn.Linear(input_size, input_size, bias_attr=False)),
(block_name + "_relu_2", paddle.nn.ReLU()),
)
if is_last:
block.add_sublayer(
block_name + "_fc_2",
paddle.nn.Linear(
input_size, 1, bias_attr=False
)
)
else:
block.add_sublayer(
block_name + "_fc_2",
paddle.nn.Linear(input_size, input_size, bias_attr=False)
)
return block
class Naive_fc_net(paddle.nn.Layer):
def __init__(self, input_size=10,
recompute_blocks=[1, 3],
recompute_kwargs={}):
super(Naive_fc_net, self).__init__()
self.recompute_blocks = recompute_blocks
self.recompute_kwargs = recompute_kwargs
self.runfunc0 = get_fc_block(0, input_size, is_last=False)
self.runfunc1 = get_fc_block(1, input_size, is_last=False)
self.runfunc2 = get_fc_block(2, input_size, is_last=False)
self.runfunc3 = get_fc_block(3, input_size, is_last=False)
self.runfunc4 = get_fc_block(4, input_size, is_last=True)
self.total_func = [self.runfunc0, self.runfunc1, self.runfunc2, self.runfunc3, self.runfunc4]
def forward(self, inputs):
nums = len(self.total_func)
for i in range(nums):
if i in self.recompute_blocks:
inputs = recompute(self.total_func[i], inputs, **{"preserve_rng_state": True})
else:
inputs = self.total_func[i](inputs)
return inputs
def run_model(cuda_state, recompute_block=[], recompute_kwargs={}):
gen = paddle.seed(10)
gen.manual_seed(10)
np.random.seed(10)
random.seed(10)
if cuda_state:
paddle.set_cuda_rng_state(cuda_state)
batch_size, input_size = 1, 10
model = Naive_fc_net(
input_size,
recompute_blocks=recompute_block,
recompute_kwargs=recompute_kwargs)
optimizer = paddle.optimizer.SGD(learning_rate=0.01, parameters=model.parameters())
loss_ = []
param_ = []
grad_ = []
for _ in range(5):
x_data = np.random.randn(batch_size, input_size).astype(np.float32)
x = paddle.to_tensor(x_data)
y_pred = model(x)
loss = y_pred.mean()
loss_.append(np.asarray(loss).tolist())
loss.backward()
optimizer.step()
param_.append(np.asarray(model.parameters()[9]).tolist())
grad_.append(np.asarray(model.parameters()[3]._grad_ivar()).tolist())
optimizer.clear_grad()
return loss_, param_, grad_
cuda_state = paddle.get_cuda_rng_state()
# without recompute
loss_ref, param_ref, grad_ref = run_model(
cuda_state, recompute_block=[]
)
loss, param, grad = run_model(cuda_state, recompute_block=[1, 2])
print("normal_loss: {}, recompute_loss: {}".format(loss_ref, loss))
# The result of the recompute_loss should be the same as the normal_loss.
"""
# Hack to mix *args with **kwargs in a python 2.7-compliant way
preserve = kwargs.pop('preserve_rng_state', True)
......
......@@ -60,11 +60,39 @@ def global_scatter(x,
group=None,
use_calc_stream=True):
"""
Scatter data in x which has been put together belong to one expert
to n_expert * world_size exeperts according to local_count and receive tensors
from n_expert * world_size experts according
to global_count.
The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count,
and then receives data according to global_count. The expert refers to a user-defined expert network,
n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the
figure respresent the rank of the current card in all cards.
The process of global_scatter sending data is as follows:
local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card;
local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card;
local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card;
local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card;
Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert;
the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
......@@ -154,9 +182,30 @@ def global_gather(x,
group=None,
use_calc_stream=True):
"""
Gather data in x to n_expert * world_size exeperts according to
local_count and receive tensors from n_expert * world_size experts according
to global_count.
The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count.
The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card.
The rank in the figure respresent the rank of the current card in all cards.
The process of global_gather sending data is as follows:
The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card;
The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册