diff --git a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py index b107015f1afaf13df4edf6a85824fdd4281fc0c5..1dcb84c66ac8191bfcb59834b5843eca462d3454 100755 --- a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py @@ -36,7 +36,11 @@ if _use_four_directions: else: from .pp_utils import p2p_communication as p2p -from .pp_utils.utils import HOOK_ACTION, FusedCommBuffer, assign_group_by_size +from paddle.distributed.fleet.utils.tensor_fusion_helper import ( + assign_group_by_size, +) + +from .pp_utils.utils import HOOK_ACTION, FusedCommBuffer __all__ = [] diff --git a/python/paddle/distributed/fleet/meta_parallel/pp_utils/utils.py b/python/paddle/distributed/fleet/meta_parallel/pp_utils/utils.py index 760fbc1d72af1b49c7b4f4a3f880f73796ff6ed2..e7d31b4aebd07a6f619feefe7c0fa3cb86f5b73b 100644 --- a/python/paddle/distributed/fleet/meta_parallel/pp_utils/utils.py +++ b/python/paddle/distributed/fleet/meta_parallel/pp_utils/utils.py @@ -12,27 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import OrderedDict - -import numpy as np import paddle from paddle import _legacy_C_ops -from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_storage import ( - GradStorage, +from paddle.distributed.fleet.utils.tensor_fusion_helper import ( + flatten_dense_tensors, ) -from paddle.fluid import core from paddle.framework import base as imperative_base -alignment = { - "gpu": 256, -} -align = { - paddle.float16.value: 2, - paddle.bfloat16.value: 2, - paddle.float32.value: 4, -} - __all__ = [] @@ -131,35 +118,6 @@ def _all_gather(tensor, group=None, use_calc_stream=True): ) -def flatten_dense_tensors(parameters, use_main_grad=False): - _buffer_size = 0 - _param2align = {} - dtype = paddle.float32 if use_main_grad else parameters[0].dtype - - for param in parameters: - assert param.trainable, "param must be trainable..." - size = np.prod(param.shape) * align[dtype] - remaining = size % alignment["gpu"] - ali = 0 if remaining == 0 else alignment["gpu"] - remaining - align_ = ali // align[dtype] - _buffer_size += np.prod(param.shape) + align_ - _param2align[param.name] = align_ - - # process gradient - grad_storage = GradStorage( - size=_buffer_size, - dtype=dtype, - device="gpu", - destination="0", - parm2align=_param2align, - ) - - for param in parameters: - grad_storage.add_grad(param, _param2align[param.name]) - - return grad_storage.buffer - - class FusedCommBuffer: def __init__(self, id, params, comm_group, acc_steps=1, act=None, dst=-1): self._id = id @@ -188,8 +146,11 @@ class FusedCommBuffer: self._init_step_dict() self.grad_storage = flatten_dense_tensors( - self._params, self.use_main_grad - ) + self._params, + use_main_grad=self.use_main_grad, + fuse_param=False, + warp_buffer=False, + ).buffer self._record_addr() @@ -272,22 +233,3 @@ class FusedCommBuffer: self.grad_storage.scale_(scale_factor) self._reset_params_checked_in() - - -def assign_group_by_size(parameters, group_size=128 * 1024 * 1024): - group_idx = 0 - memory_counter = 0 - var_groups = OrderedDict() - dtype = parameters[0].dtype - - for var in parameters: - bytes = np.prod(var.shape) * core.size_of_dtype(var.dtype) - if memory_counter < group_size and dtype == var.dtype: - memory_counter += bytes - else: - memory_counter = bytes - dtype = var.dtype - group_idx += 1 - var_groups.setdefault(group_idx, []).append(var) - - return var_groups diff --git a/python/paddle/distributed/fleet/utils/tensor_fusion_helper.py b/python/paddle/distributed/fleet/utils/tensor_fusion_helper.py index e097b4686fbc8fac6460511ddf5d7dff5eb6ef58..403f9d5d9a6c151a0a83847b4e927a31bac18b04 100644 --- a/python/paddle/distributed/fleet/utils/tensor_fusion_helper.py +++ b/python/paddle/distributed/fleet/utils/tensor_fusion_helper.py @@ -30,8 +30,7 @@ align = { } -def assign_group_by_size(parameters, group_size=256 * 1024 * 1024): - # TODO(Yuang Liu): make pp_utils/utils use this tensor fusion helper +def assign_group_by_size(parameters, group_size=128 * 1024 * 1024): is_sparse_gradient = [False] * len(parameters) group_indices = core.eager_assign_group_by_size( @@ -45,7 +44,9 @@ def assign_group_by_size(parameters, group_size=256 * 1024 * 1024): return var_groups -def flatten_dense_tensors(parameters, use_main_grad): +def flatten_dense_tensors( + parameters, use_main_grad=False, fuse_param=True, warp_buffer=False +): from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_storage import ( GradStorage, ParamStorage, @@ -64,9 +65,11 @@ def flatten_dense_tensors(parameters, use_main_grad): _buffer_size += np.prod(param.shape) + align_ _param2align[param.name] = align_ - param_storage = ParamStorage(size=_buffer_size, dtype=dtype, device="gpu") - - param_storage.add_rank_params(parameters, _param2align) + if fuse_param: + param_storage = ParamStorage( + size=_buffer_size, dtype=dtype, device="gpu" + ) + param_storage.add_rank_params(parameters, _param2align) # process gradient grad_dtype = paddle.float32 if use_main_grad else dtype @@ -81,27 +84,35 @@ def flatten_dense_tensors(parameters, use_main_grad): for param in parameters: grad_storage.add_grad(param, _param2align[param.name]) - param_storage.warp_buffer() - grad_storage.warp_buffer() + if warp_buffer: + if fuse_param: + param_storage.warp_buffer() + grad_storage.warp_buffer() - if not use_main_grad: - # param_storage --> grad_storage - param_storage.buffer._copy_gradient_from(grad_storage.buffer) + if fuse_param: + if not use_main_grad: + # param_storage --> grad_storage + param_storage.buffer._copy_gradient_from(grad_storage.buffer) + else: + param_storage.buffer.main_grad = grad_storage.buffer + param_storage.buffer.stop_gradient = False + return param_storage, grad_storage else: - param_storage.buffer.main_grad = grad_storage.buffer - param_storage.buffer.stop_gradient = False - return param_storage, grad_storage + return grad_storage def obtain_storage(parameters, use_main_grad, clip, dist): if len(parameters) < 1: return [] - var_groups = assign_group_by_size(parameters) + var_groups = assign_group_by_size(parameters, group_size=256 * 1024 * 1024) storage = [] for group_idx, parameters in var_groups.items(): param_storage, grad_storage = flatten_dense_tensors( - parameters, use_main_grad + parameters, + use_main_grad=use_main_grad, + fuse_param=True, + warp_buffer=True, ) param_storage.buffer.need_clip = clip param_storage.buffer.is_distributed = dist